如何在Web套接字连接中以无反应流的形式从订阅者向发布者发送消息

fs123

我的应用程序具有Akka-Websocket界面。Web套接字由一个演员订阅者和一个演员发布者组成。订户通过将命令发送给相应的参与者来处理命令。发布者侦听事件流,然后将更新信息发布回事件流(并最终发布给客户端)。这很好。

我的问题:订户如何将事件发送回流?例如,以确认接收到的命令的执行。

public class WebSocketApp extends HttpApp {

  private static final Gson gson = new Gson();

  @Override
  public Route createRoute() {
    return get(
        path("metrics").route(handleWebSocketMessages(metrics()))
        );
  }

  private Flow<Message, Message, ?> metrics() {
    Sink<Message, ActorRef> metricsSink = Sink.actorSubscriber(WebSocketCommandSubscriber.props());
    Source<Message, ActorRef> metricsSource = 
        Source.actorPublisher(WebSocketDataPublisherActor.props())
        .map((measurementData) -> TextMessage.create(gson.toJson(measurementData)));
    return Flow.fromSinkAndSource(metricsSink, metricsSource);
  }
}

一个很好的解决方案是,订阅的actor(WebSocketCommandSubscriber上面代码中actor)可以将消息发送回流,就像sender().tell(...)...

弗拉基米尔·马特维耶夫(Vladimir Matveev)

不,至少不可能,不是直接。流始终是单向的-所有消息都在一个方向上流动,而对它们的需求则在相反方向上流动。您需要将确认消息从接收器传递到源,以便后者将其发送回客户端,例如,通过在接收器actor中注册源actor。它可能看起来像这样:

Flow.fromSinkAndSourceMat(metricsSink, metricsSource, (sinkActor, sourceActor) -> {
    sinkActor.tell(new RegisterSource(sourceActor), null);
})

然后,接收器actor收到RegisterSource消息后,可以将确认消息发送到提供的ActorRef,然后将其转发到输出流。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

如何在Redis Rails中订阅多个发布者?

来自分类Dev

如何从广播的Akka流中获取订阅者和发布者?

来自分类Dev

RabbitMQ发布者-多个订阅者-如何在.Net中设置?

来自分类Dev

使用新的订阅者连接(MQTT / Mosquitto)显示来自发布者的错过的消息

来自分类Dev

如何在订阅代码 mqtt 中打印发布者属性?

来自分类Dev

我们如何使用 jmeter AMQP 发布者插件向每个线程发送不同的消息?

来自分类Dev

ReactJS中的发布者/订阅者模型

来自分类Dev

nanomsg (nng) 中的多个发布者和订阅者

来自分类Dev

ros python发布者/订阅者

来自分类Dev

如何在发布者中获得事件的响应?

来自分类Dev

IBM MQ发布/订阅向一个订阅者发送消息

来自分类Dev

如何编写向其第二个订阅者发出最后发出的值的联合发布者

来自分类Dev

发布者仅在无限循环中发送消息

来自分类Dev

Redis发布/订阅-发布者也是订阅者?

来自分类Dev

订阅返回的发布者后,如何触发流程?

来自分类Dev

Spring RabbitTemplate-如何在发布者确认模式下保留NACK的已发布消息

来自分类Dev

NATS 发布者能否将一条消息发送到多个队列中?

来自分类Dev

Akka 流:将新的发布者/订阅者附加到 Flow

来自分类Dev

Web日历发布(.ical或ics)是由发布者推送还是由订阅者推送?

来自分类Dev

如何在没有自定义订阅者的情况下调整Combine的发布者需求?

来自分类Dev

如何设置Akka流发布者的inputBuffer?

来自分类Dev

Opentok:如何正确杀死流/发布者?

来自分类Dev

发布者等待消费消息,同时它应该做两件事(发布和订阅订阅者的回复)。)

来自分类Dev

如何更改资产发布者默认未找到消息?

来自分类Dev

Jenkins中的ZMQ事件发布者未发送通知

来自分类Dev

我如何默认将发布者证书安装到“受信任的发布者”中?

来自分类Dev

发布者订阅者模式的现代替代方案

来自分类Dev

订阅者无法读取发布者的图片

来自分类Dev

使用$ .deferred作为发布者/订阅者?

Related 相关文章

  1. 1

    如何在Redis Rails中订阅多个发布者?

  2. 2

    如何从广播的Akka流中获取订阅者和发布者?

  3. 3

    RabbitMQ发布者-多个订阅者-如何在.Net中设置?

  4. 4

    使用新的订阅者连接(MQTT / Mosquitto)显示来自发布者的错过的消息

  5. 5

    如何在订阅代码 mqtt 中打印发布者属性?

  6. 6

    我们如何使用 jmeter AMQP 发布者插件向每个线程发送不同的消息?

  7. 7

    ReactJS中的发布者/订阅者模型

  8. 8

    nanomsg (nng) 中的多个发布者和订阅者

  9. 9

    ros python发布者/订阅者

  10. 10

    如何在发布者中获得事件的响应?

  11. 11

    IBM MQ发布/订阅向一个订阅者发送消息

  12. 12

    如何编写向其第二个订阅者发出最后发出的值的联合发布者

  13. 13

    发布者仅在无限循环中发送消息

  14. 14

    Redis发布/订阅-发布者也是订阅者?

  15. 15

    订阅返回的发布者后,如何触发流程?

  16. 16

    Spring RabbitTemplate-如何在发布者确认模式下保留NACK的已发布消息

  17. 17

    NATS 发布者能否将一条消息发送到多个队列中?

  18. 18

    Akka 流:将新的发布者/订阅者附加到 Flow

  19. 19

    Web日历发布(.ical或ics)是由发布者推送还是由订阅者推送?

  20. 20

    如何在没有自定义订阅者的情况下调整Combine的发布者需求?

  21. 21

    如何设置Akka流发布者的inputBuffer?

  22. 22

    Opentok:如何正确杀死流/发布者?

  23. 23

    发布者等待消费消息,同时它应该做两件事(发布和订阅订阅者的回复)。)

  24. 24

    如何更改资产发布者默认未找到消息?

  25. 25

    Jenkins中的ZMQ事件发布者未发送通知

  26. 26

    我如何默认将发布者证书安装到“受信任的发布者”中?

  27. 27

    发布者订阅者模式的现代替代方案

  28. 28

    订阅者无法读取发布者的图片

  29. 29

    使用$ .deferred作为发布者/订阅者?

热门标签

归档