Akka持久性查询和参与者分片

里布克

我正在做CQRS Akka actors应用程序的查询端。

查询参与者被设置为群集分片,并填充了一个持久性查询流中的事件。

我的问题是:

  1. 如果群集分片中的参与者之一重新启动,该如何恢复呢?

    • 关闭整个群集分片并回复所有事件?
    • 使集群碎片中的参与者成为持久参与者,并仅将新的事件集保存给查询方?
  2. 如果用Persistence Query填充的actor重新启动,如何取消当前的PQ并重新启动它?

蒂维根

如前所述,我将评估将查询端持久存储在数据库中的情况。

如果这不是一个选择,并且您希望坚持使用每个分片的单个持久性查询,请在查询参与者中执行以下操作:

var inRecovery: Boolean = true;

override def preStart( ) = {
    //Subscribe to your event live stream now, so you don't miss anything during recovery
    // e.g. send Subscription message to your persistence query actor

    //Re-Read everything up to now for recovery
    readJournal.currentEventsByPersistenceId("persistenceId")
        .watchTermination()((_, f) => f pipeTo self) // Send Done to self after recovery is finished
        .map(Replay.apply) // Mark your replay messages
        .runWith( Sink.actorRef( self, tag ) ) // Send all replay events to self
}

override def receive = {
    case Done => // Recovery is finished
        inRecovery = false
        unstashAll() // unstash all normal messages received during recovery

    case Replay( payload ) =>
        //handle replayed messages

    case events: Event =>
        //handle normal events from your persistence query
        inRecovery match {
            case true => stash() // stash normal messages until recovery is done
            case false => 
                // recovery is done, start handling normal events
        }
}


case class Replay( payload: AnyRef )

因此,基本上在参与者开始订阅持久性查询参与者之前,使用所有过去事件的有限流恢复状态,该事件在所有事件都经过后才终止。在恢复期间,将隐藏所有传入事件,这些事件不是重播事件。然后,在完成恢复后,取消隐藏所有内容并开始处理正常消息。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

在Akka中对持久性参与者进行单元测试

来自分类Dev

Akka持久性参与者的用例是什么?

来自分类Dev

Lagom消息在持久性参与者和读取处理器之间的持久性

来自分类Dev

具有持久参与者的 Akka Sharding 的背压

来自分类Dev

Akka持久性查询事件流和CQRS

来自分类Dev

Akka.NET查询参与者有效

来自分类Dev

Akka消息和参与者的命名约定

来自分类Dev

Akka启动和访问参与者返回的数据

来自分类Dev

Akka.NET参与者和包装器(可能带有Rx)

来自分类Dev

具有Akka和长期运行流程的参与者模式

来自分类Dev

Akka:参与者之间沟通和处理特殊状态(无错误)

来自分类Dev

具有参与者的Akka群集节点离开

来自分类Dev

Akka:在参与者系统之外进行交流?

来自分类Dev

Akka参与者之间的工作负载平衡

来自分类Dev

Akka是否根据可用资源来规范参与者?

来自分类Dev

SOAP 和持久性

来自分类Dev

未处理邮件的Akka持久性

来自分类Dev

Akka.Net与MongoDB的持久性

来自分类Dev

JMS持久性和持久性

来自分类Dev

如何使用Akka持久性查询检索所有日记帐事件?

来自分类Dev

Java持久性查询语言-LIKE

来自分类Dev

什么是等效的OpenID Connect和SAML参与者/角色?

来自分类Dev

MYSQL计数响应者和总参与者

来自分类Dev

Service Fabric 参与者、ReceiveReminderAsync 和 CancellationTokens

来自分类Dev

Akka.net与Cassandra和动态键空间的持久性

来自分类Dev

持久性无知和REST

来自分类Dev

改造和持久性Cookie存储

来自分类Dev

聊天服务和持久性

来自分类Dev

B树和磁盘持久性