我试过:EventStream -> Source -> Akka HTTP (SSE)
在我看来,这是行不通的,因为源将由 Akka HTTP 物化,complete(Source, ...)
并将消息从 EventStream 发送到物化的源,我需要 ActorRef(有没有办法获取该 ActorRef?)
我在 GitHub 上找到了一个使用 ActorPublisher 的解决方案:https : //github.com/calvinlfer/Akka-HTTP-Akka-Streams-Akka-Actors-Integration
但是由于 ActorPublisher 是一个内部 API,我仍然希望有一个干净的解决方案。
您可以使用Source.actorRef
来创建Source
将事件流元素转换为ServerSentEvent
实例的 和BroadcastHub.sink
,其方式如下:
val (sseActor, sseSource) =
Source.actorRef[EventStreamMessageOrWhatever](10, akka.stream.OverflowStrategy.dropTail)
.map(s => /* convert event stream elements to ServerSideEvent */)
.keepAlive(1.second, () => ServerSentEvent.heartbeat)
.toMat(BroadcastHub.sink[ServerSentEvent])(Keep.both)
.run()
如果有下游需求,则发送到物化的消息(即事件流元素)向ActorRef
下游发出。如果没有下游需求,则使用指定的溢出策略将消息缓冲到一定数量(在此示例中,缓冲区大小为 10)。
然后,您可以将物化 actor 订阅到EventStream
:
eventStream.subscribe(sseActor, ...)
物化Source
可用于您的路径:
path("sse") {
get {
complete(sseSource)
}
}
请注意,这种方法没有背压。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句