我正在做CQRS Akka actors应用程序的查询端。
查询参与者被设置为群集分片,并填充了一个持久性查询流中的事件。
我的问题是:
如果群集分片中的参与者之一重新启动,该如何恢复呢?
如果用Persistence Query填充的actor重新启动,如何取消当前的PQ并重新启动它?
如前所述,我将评估将查询端持久存储在数据库中的情况。
如果这不是一个选择,并且您希望坚持使用每个分片的单个持久性查询,请在查询参与者中执行以下操作:
var inRecovery: Boolean = true;
override def preStart( ) = {
//Subscribe to your event live stream now, so you don't miss anything during recovery
// e.g. send Subscription message to your persistence query actor
//Re-Read everything up to now for recovery
readJournal.currentEventsByPersistenceId("persistenceId")
.watchTermination()((_, f) => f pipeTo self) // Send Done to self after recovery is finished
.map(Replay.apply) // Mark your replay messages
.runWith( Sink.actorRef( self, tag ) ) // Send all replay events to self
}
override def receive = {
case Done => // Recovery is finished
inRecovery = false
unstashAll() // unstash all normal messages received during recovery
case Replay( payload ) =>
//handle replayed messages
case events: Event =>
//handle normal events from your persistence query
inRecovery match {
case true => stash() // stash normal messages until recovery is done
case false =>
// recovery is done, start handling normal events
}
}
case class Replay( payload: AnyRef )
因此,基本上在参与者开始订阅持久性查询参与者之前,使用所有过去事件的有限流恢复状态,该事件在所有事件都经过后才终止。在恢复期间,将隐藏所有传入事件,这些事件不是重播事件。然后,在完成恢复后,取消隐藏所有内容并开始处理正常消息。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句