Akka持久性查询事件流和CQRS

布拉尼斯拉夫·拉齐奇(Branislav Lazic)

我正在尝试在我的ES-CQRS架构中实现读取端。假设我有一个像这样的执着演员:

object UserWrite {

  sealed trait UserEvent
  sealed trait State
  case object Uninitialized extends State
  case class User(username: String, password: String) extends State
  case class AddUser(user: User)
  case class UserAdded(user: User) extends UserEvent
  case class UserEvents(userEvents: Source[(Long, UserEvent), NotUsed])
  case class UsersStream(fromSeqNo: Long)
  case object GetCurrentUser

  def props = Props(new UserWrite)
}

class UserWrite extends PersistentActor {

  import UserWrite._

  private var currentUser: State = Uninitialized

  override def persistenceId: String = "user-write"

  override def receiveRecover: Receive = {
    case UserAdded(user) => currentUser = user
  }

  override def receiveCommand: Receive = {
    case AddUser(user: User) => persist(UserAdded(user)) {
      case UserAdded(`user`) => currentUser = user
    }
    case UsersStream(fromSeqNo: Long) => publishUserEvents(fromSeqNo)
    case GetCurrentUser => sender() ! currentUser
  }

  def publishUserEvents(fromSeqNo: Long) = {
    val readJournal = PersistenceQuery(context.system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
    val userEvents = readJournal
      .eventsByPersistenceId("user-write", fromSeqNo, Long.MaxValue)
      .map { case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event }
    sender() ! UserEvents(userEvents)
  }
}

据我了解,每次事件持续存在时,我们都可以通过发布Akka Persistence Query现在,我不确定订阅这些事件的正确方法是什么,以便可以将其持久保存在我的读取方数据库中?一种想法是首先UsersStream从我的阅读方演员向UserWrite演员发送消息,并在该阅读演员中“下沉”事件。

编辑

根据@cmbaxter的建议,我以这种方式实现了read side:

object UserRead {

  case object GetUsers
  case class GetUserByUsername(username: String)
  case class LastProcessedEventOffset(seqNo: Long)
  case object StreamCompleted

  def props = Props(new UserRead)
}

class UserRead extends PersistentActor {
  import UserRead._

  var inMemoryUsers = Set.empty[User]
  var offset        = 0L

  override val persistenceId: String = "user-read"

  override def receiveRecover: Receive = {
    // Recovery from snapshot will always give us last sequence number
    case SnapshotOffer(_, LastProcessedEventOffset(seqNo)) => offset = seqNo
    case RecoveryCompleted                                 => recoveryCompleted()
  }

  // After recovery is being completed, events will be projected to UserRead actor
  def recoveryCompleted(): Unit = {
    implicit val materializer = ActorMaterializer()
    PersistenceQuery(context.system)
      .readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
      .eventsByPersistenceId("user-write", offset + 1, Long.MaxValue)
      .map {
        case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event
      }
      .runWith(Sink.actorRef(self, StreamCompleted))
  }

  override def receiveCommand: Receive = {
    case GetUsers                    => sender() ! inMemoryUsers
    case GetUserByUsername(username) => sender() ! inMemoryUsers.find(_.username == username)
    // Match projected event and update offset
    case (seqNo: Long, UserAdded(user)) =>
      saveSnapshot(LastProcessedEventOffset(seqNo))
      inMemoryUsers += user
  }
}

有一些问题,例如:事件流似乎很慢。UserRead演员可以在保存新添加的用户之前与一组用户进行应答。

编辑2

我增加了cassandra查询日志的刷新间隔,从而减少了事件流缓慢带来的问题。默认情况下,似乎Cassandra事件日志是每3秒进行一次轮询。在我的application.conf补充中:

cassandra-query-journal {
  refresh-interval = 20ms
}

编辑3

实际上,请勿减少刷新间隔。这将增加内存使用量,但这并不危险,一点也不重要。CQRS的一般概念是写入和读取端是异步的。因此,写入后,数据将永远无法立即读取。处理UI?在读取端确认后,我只是打开流并通过服务器发送的事件推送数据。

擦0

有一些方法可以做到这一点。例如,在我的应用程序中,我在查询端有一个演员,该演员具有一个PersistenceQuery,该PersistenceQuery一直在寻找更改,但是您也可以在一个线程中使用相同的查询。事情是保持流打开,以便能够在发生持久事件后立即读取它

val readJournal =
PersistenceQuery(system).readJournalFor[CassandraReadJournal](
  CassandraReadJournal.Identifier)

// issue query to journal
val source: Source[EventEnvelope, NotUsed] =
  readJournal.eventsByPersistenceId(s"MyActorId", 0, Long.MaxValue)

// materialize stream, consuming events
implicit val mat = ActorMaterializer()
source.map(_.event).runForeach{
  case userEvent: UserEvent => {
    doSomething(userEvent)
  }
}

取而代之的是,您可以使用一个引发PersistenceQuery并存储新事件的计时器,但是我认为打开流是最好的方法

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

Akka持久性查询和参与者分片

来自分类Dev

如何使用Akka持久性查询检索所有日记帐事件?

来自分类Dev

具有持久性忽略对象的持久性和域事件

来自分类Dev

具有持久性忽略对象的持久性和域事件

来自分类Dev

Nestjs事件源-事件持久性

来自分类Dev

SOAP 和持久性

来自分类Dev

未处理邮件的Akka持久性

来自分类Dev

Akka.Net与MongoDB的持久性

来自分类Dev

JMS持久性和持久性

来自分类Dev

具有akka持久性的事件源:列表的增长状态?

来自分类Dev

Java持久性查询语言-LIKE

来自分类Dev

Akka.net与Cassandra和动态键空间的持久性

来自分类Dev

持久性无知和REST

来自分类Dev

改造和持久性Cookie存储

来自分类Dev

聊天服务和持久性

来自分类Dev

B树和磁盘持久性

来自分类Dev

休眠和持久性。注释错误

来自分类Dev

聊天服务和持久性

来自分类Dev

持久性和裂脑方案

来自分类Dev

使用Play框架实现akka持久性

来自分类Dev

如何为akka.net启用消息持久性

来自分类Dev

Akka 持久性:ReadJournal.runFold 永远不会返回

来自分类Dev

使用Java持久性查询语言在表中插入和更新数据

来自分类Dev

工作流迁移:保留持久性价值?

来自分类Dev

诊断DRM内容的持久性Flash流故障

来自分类Dev

类路径中的ejb3持久性和javax持久性jar文件?

来自分类Dev

持久性XSS与非持久性XSS之间的主要区别和定义

来自分类Dev

此Java持久性查询有什么问题?

来自分类Dev

SQL查询分组依据(计算持久性)