我的应用程序具有Akka-Websocket界面。Web套接字由一个演员订阅者和一个演员发布者组成。订户通过将命令发送给相应的参与者来处理命令。发布者侦听事件流,然后将更新信息发布回事件流(并最终发布给客户端)。这很好。
我的问题:订户如何将事件发送回流?例如,以确认接收到的命令的执行。
public class WebSocketApp extends HttpApp {
private static final Gson gson = new Gson();
@Override
public Route createRoute() {
return get(
path("metrics").route(handleWebSocketMessages(metrics()))
);
}
private Flow<Message, Message, ?> metrics() {
Sink<Message, ActorRef> metricsSink = Sink.actorSubscriber(WebSocketCommandSubscriber.props());
Source<Message, ActorRef> metricsSource =
Source.actorPublisher(WebSocketDataPublisherActor.props())
.map((measurementData) -> TextMessage.create(gson.toJson(measurementData)));
return Flow.fromSinkAndSource(metricsSink, metricsSource);
}
}
一个很好的解决方案是,订阅的actor(WebSocketCommandSubscriber
上面代码中的actor)可以将消息发送回流,就像sender().tell(...)
...
不,至少不可能,不是直接。流始终是单向的-所有消息都在一个方向上流动,而对它们的需求则在相反方向上流动。您需要将确认消息从接收器传递到源,以便后者将其发送回客户端,例如,通过在接收器actor中注册源actor。它可能看起来像这样:
Flow.fromSinkAndSourceMat(metricsSink, metricsSource, (sinkActor, sourceActor) -> {
sinkActor.tell(new RegisterSource(sourceActor), null);
})
然后,接收器actor收到RegisterSource
消息后,可以将确认消息发送到提供的ActorRef
,然后将其转发到输出流。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句