如何从广播的Akka流中获取订阅者和发布者?

里普拉

使用更复杂的图形时,在使发布者和订阅者脱离流程时遇到了问题。我的目标是提供发布者和订阅者的API,并在内部运行Akka流。这是我的第一次尝试,效果很好。

val subscriberSource = Source.subscriber[Boolean]
val someFunctionSink = Sink.foreach(Console.println)

val flow = subscriberSource.to(someFunctionSink)

//create Reactive Streams Subscriber
val subscriber: Subscriber[Boolean] = flow.run()

//prints true
Source.single(true).to(Sink(subscriber)).run()

但是,对于更复杂的广播图,我不确定如何获取Subscriber和Publisher对象?我需要局部图吗?

val subscriberSource = Source.subscriber[Boolean]
val someFunctionSink = Sink.foreach(Console.println)
val publisherSink = Sink.publisher[Boolean]

FlowGraph.closed() { implicit builder =>
  import FlowGraph.Implicits._

  val broadcast = builder.add(Broadcast[Boolean](2))

  subscriberSource ~> broadcast.in
  broadcast.out(0) ~> someFunctionSink
  broadcast.out(1) ~> publisherSink
}.run()

val subscriber: Subscriber[Boolean] = ???
val publisher: Publisher[Boolean] = ???
鲁道夫

当您调用RunnableGraph.run()流时,运行结果是该运行的“物化值”。

在您简单的例子物化的价值Source.subscriber[Boolean]就是Subscriber[Boolean]在您的复杂示例中,您希望将图形的几个组成部分的物化值合并为一个元组的物化值(Subscriber[Boolean], Publisher[Boolean])

为此,您可以将您感兴趣的组件的物化值传递给它们FlowGraph.closed(),然后指定一个函数来组合物化值:

import akka.stream.scaladsl._
import org.reactivestreams._

val subscriberSource = Source.subscriber[Boolean]
val someFunctionSink = Sink.foreach(Console.println)
val publisherSink = Sink.publisher[Boolean]

val graph =
  FlowGraph.closed(subscriberSource, publisherSink)(Keep.both) { implicit builder ⇒
    (in, out) ⇒
      import FlowGraph.Implicits._

      val broadcast = builder.add(Broadcast[Boolean](2))

      in ~> broadcast.in
      broadcast.out(0) ~> someFunctionSink
      broadcast.out(1) ~> out
  }
val (subscriber: Subscriber[Boolean], publisher: Publisher[Boolean]) = graph.run()

有关FlowGraph.closed重载的更多信息,请参见Scaladocs

Keep.both是功能的缩写(a, b) => (a, b)

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

nanomsg (nng) 中的多个发布者和订阅者

来自分类Dev

如何设置Akka流发布者的inputBuffer?

来自分类Dev

Akka 流:将新的发布者/订阅者附加到 Flow

来自分类Dev

如何在Web套接字连接中以无反应流的形式从订阅者向发布者发送消息

来自分类Dev

如何在Redis Rails中订阅多个发布者?

来自分类Dev

opentok:发布者和订阅者视频显示相同

来自分类Dev

ReactJS中的发布者/订阅者模型

来自分类Dev

RabbitMQ发布者-多个订阅者-如何在.Net中设置?

来自分类Dev

ros python发布者/订阅者

来自分类Dev

Redis发布/订阅-发布者也是订阅者?

来自分类Dev

订阅服务器和发布者在ROS中的一个文件中

来自分类Dev

订阅返回的发布者后,如何触发流程?

来自分类Dev

如何在订阅代码 mqtt 中打印发布者属性?

来自分类Dev

Opentok:如何正确杀死流/发布者?

来自分类Dev

我如何默认将发布者证书安装到“受信任的发布者”中?

来自分类Dev

处理多个主题,发布者和订阅者时出现错误+ WSO2MB

来自分类Dev

Android Opentok SDK 2.0,订阅者和发布者音频

来自分类Dev

发布者等待消费消息,同时它应该做两件事(发布和订阅订阅者的回复)。)

来自分类Dev

发布者订阅者模式的现代替代方案

来自分类Dev

订阅者无法读取发布者的图片

来自分类Dev

使用$ .deferred作为发布者/订阅者?

来自分类Dev

pyzmq SUB订阅者如何检测到离线PUB发布者?

来自分类Dev

DirectProcessor 和 UnicastProcessor 可以在不应该订阅上游发布者时订阅。为什么?

来自分类Dev

发布/订阅中的发布者应该是同步的还是异步的?

来自分类Dev

Spring AMQP中的“发布者退货”如何发生/工作?

来自分类Dev

在SwiftUI中异步更新后如何触发发布者?

来自分类Dev

如何在发布者中获得事件的响应?

来自分类Dev

Nservice总线...发布者订阅者正在筛选订阅者

来自分类Dev

如何从两个发布者A和B创建一个Swift Combine发布者,其中发布者B消耗发布者A的价值?

Related 相关文章

  1. 1

    nanomsg (nng) 中的多个发布者和订阅者

  2. 2

    如何设置Akka流发布者的inputBuffer?

  3. 3

    Akka 流:将新的发布者/订阅者附加到 Flow

  4. 4

    如何在Web套接字连接中以无反应流的形式从订阅者向发布者发送消息

  5. 5

    如何在Redis Rails中订阅多个发布者?

  6. 6

    opentok:发布者和订阅者视频显示相同

  7. 7

    ReactJS中的发布者/订阅者模型

  8. 8

    RabbitMQ发布者-多个订阅者-如何在.Net中设置?

  9. 9

    ros python发布者/订阅者

  10. 10

    Redis发布/订阅-发布者也是订阅者?

  11. 11

    订阅服务器和发布者在ROS中的一个文件中

  12. 12

    订阅返回的发布者后,如何触发流程?

  13. 13

    如何在订阅代码 mqtt 中打印发布者属性?

  14. 14

    Opentok:如何正确杀死流/发布者?

  15. 15

    我如何默认将发布者证书安装到“受信任的发布者”中?

  16. 16

    处理多个主题,发布者和订阅者时出现错误+ WSO2MB

  17. 17

    Android Opentok SDK 2.0,订阅者和发布者音频

  18. 18

    发布者等待消费消息,同时它应该做两件事(发布和订阅订阅者的回复)。)

  19. 19

    发布者订阅者模式的现代替代方案

  20. 20

    订阅者无法读取发布者的图片

  21. 21

    使用$ .deferred作为发布者/订阅者?

  22. 22

    pyzmq SUB订阅者如何检测到离线PUB发布者?

  23. 23

    DirectProcessor 和 UnicastProcessor 可以在不应该订阅上游发布者时订阅。为什么?

  24. 24

    发布/订阅中的发布者应该是同步的还是异步的?

  25. 25

    Spring AMQP中的“发布者退货”如何发生/工作?

  26. 26

    在SwiftUI中异步更新后如何触发发布者?

  27. 27

    如何在发布者中获得事件的响应?

  28. 28

    Nservice总线...发布者订阅者正在筛选订阅者

  29. 29

    如何从两个发布者A和B创建一个Swift Combine发布者,其中发布者B消耗发布者A的价值?

热门标签

归档