我正在尝试在我的ES-CQRS架构中实现读取端。假设我有一个像这样的执着演员:
object UserWrite {
sealed trait UserEvent
sealed trait State
case object Uninitialized extends State
case class User(username: String, password: String) extends State
case class AddUser(user: User)
case class UserAdded(user: User) extends UserEvent
case class UserEvents(userEvents: Source[(Long, UserEvent), NotUsed])
case class UsersStream(fromSeqNo: Long)
case object GetCurrentUser
def props = Props(new UserWrite)
}
class UserWrite extends PersistentActor {
import UserWrite._
private var currentUser: State = Uninitialized
override def persistenceId: String = "user-write"
override def receiveRecover: Receive = {
case UserAdded(user) => currentUser = user
}
override def receiveCommand: Receive = {
case AddUser(user: User) => persist(UserAdded(user)) {
case UserAdded(`user`) => currentUser = user
}
case UsersStream(fromSeqNo: Long) => publishUserEvents(fromSeqNo)
case GetCurrentUser => sender() ! currentUser
}
def publishUserEvents(fromSeqNo: Long) = {
val readJournal = PersistenceQuery(context.system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
val userEvents = readJournal
.eventsByPersistenceId("user-write", fromSeqNo, Long.MaxValue)
.map { case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event }
sender() ! UserEvents(userEvents)
}
}
据我了解,每次事件持续存在时,我们都可以通过发布Akka Persistence Query
。现在,我不确定订阅这些事件的正确方法是什么,以便可以将其持久保存在我的读取方数据库中?一种想法是首先UsersStream
从我的阅读方演员向UserWrite
演员发送消息,并在该阅读演员中“下沉”事件。
编辑
根据@cmbaxter的建议,我以这种方式实现了read side:
object UserRead {
case object GetUsers
case class GetUserByUsername(username: String)
case class LastProcessedEventOffset(seqNo: Long)
case object StreamCompleted
def props = Props(new UserRead)
}
class UserRead extends PersistentActor {
import UserRead._
var inMemoryUsers = Set.empty[User]
var offset = 0L
override val persistenceId: String = "user-read"
override def receiveRecover: Receive = {
// Recovery from snapshot will always give us last sequence number
case SnapshotOffer(_, LastProcessedEventOffset(seqNo)) => offset = seqNo
case RecoveryCompleted => recoveryCompleted()
}
// After recovery is being completed, events will be projected to UserRead actor
def recoveryCompleted(): Unit = {
implicit val materializer = ActorMaterializer()
PersistenceQuery(context.system)
.readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
.eventsByPersistenceId("user-write", offset + 1, Long.MaxValue)
.map {
case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event
}
.runWith(Sink.actorRef(self, StreamCompleted))
}
override def receiveCommand: Receive = {
case GetUsers => sender() ! inMemoryUsers
case GetUserByUsername(username) => sender() ! inMemoryUsers.find(_.username == username)
// Match projected event and update offset
case (seqNo: Long, UserAdded(user)) =>
saveSnapshot(LastProcessedEventOffset(seqNo))
inMemoryUsers += user
}
}
有一些问题,例如:事件流似乎很慢。即UserRead
演员可以在保存新添加的用户之前与一组用户进行应答。
编辑2
我增加了cassandra查询日志的刷新间隔,从而减少了事件流缓慢带来的问题。默认情况下,似乎Cassandra事件日志是每3秒进行一次轮询。在我的application.conf
补充中:
cassandra-query-journal {
refresh-interval = 20ms
}
编辑3
实际上,请勿减少刷新间隔。这将增加内存使用量,但这并不危险,一点也不重要。CQRS的一般概念是写入和读取端是异步的。因此,写入后,数据将永远无法立即读取。处理UI?在读取端确认后,我只是打开流并通过服务器发送的事件推送数据。
有一些方法可以做到这一点。例如,在我的应用程序中,我在查询端有一个演员,该演员具有一个PersistenceQuery,该PersistenceQuery一直在寻找更改,但是您也可以在一个线程中使用相同的查询。事情是保持流打开,以便能够在发生持久事件后立即读取它
val readJournal =
PersistenceQuery(system).readJournalFor[CassandraReadJournal](
CassandraReadJournal.Identifier)
// issue query to journal
val source: Source[EventEnvelope, NotUsed] =
readJournal.eventsByPersistenceId(s"MyActorId", 0, Long.MaxValue)
// materialize stream, consuming events
implicit val mat = ActorMaterializer()
source.map(_.event).runForeach{
case userEvent: UserEvent => {
doSomething(userEvent)
}
}
取而代之的是,您可以使用一个引发PersistenceQuery并存储新事件的计时器,但是我认为打开流是最好的方法
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句