根据源的物化值创建接收器

塔布杜拉迪

我想围绕Pubsub主题构建流程

\-----------------------------------------------------\
 \  ------------------  ------------    -------------  \
  > > wrapWithPublish > > toPubsub |    | fromPubsub >  >
 /  ------------------  ------------    -------------  /
/-----------------------------------------------------/

这是我到目前为止编写的代码

def mediatorFlow[In, Out](mediator: ActorRef, topic: String): Flow[In, Out, Unit] = {
  val source =
    Source
      .actorRef[Out](10, OverflowStrategy.dropHead)
      .mapMaterializedValue { ref => mediator ! DistributedPubSubMediator.Subscribe(topic, ref); ref }

  val wrapWithPublish =
    Flow[In].map(DistributedPubSubMediator.Publish(topic, _))

  val unsubscribe = DistributedPubSubMediator.Unsubscribe(topic, ref???)

  val toPubsub =
    Sink.actorRef[DistributedPubSubMediator.Publish](mediator, unsubscribe)

  Flow.fromSinkAndSource(wrapWithPublish to toPubsub, source)
}

问题在于的定义unsubscribe,我想DistributedPubSubMediator.Subscribe在流的末尾发送a ,它指定aref应该是source上面定义的物化值

我知道,当Pubsub在流的末尾死亡时,它将自动取消订阅Actor。但是我很好奇无论如何如何解决问题。

罗兰·库恩

为了实现这一点,您将需要构建一个比紧密结合的Flow fromSinkAndSource,您将需要使用GraphDSL:

val source = ... // as above
Flow.fromGraph(GraphDSL.create(source) { implicit b =>
  src =>
    import GraphDSL.Implicits._
    val concat = b.add(Concat[Any](2))
    val wrapWithPublish = b.add(Flow[In].map(DistributedPubSubMediator.Publish(topic, _)))
    val toPubSub = b.add(Sink.actorRef[Any](mediator, unsubscribe))

    wrapWithPublish ~> concat ~> toPubSub
    b.materializedValue.map(DistributedPubSubMediator.Unsubscribe(topic, _)) ~> concat

    FlowShape(wrapWithPublish.in, src.out)
})

这样,您可以将其中一部分的物化值注入到流元素的级别中,以使其易于发送给pubsub介体。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

样式媒体接收器源(Chromecast)

来自分类Dev

“接收器输出,源输出,接收器卸载,源卸载”对GPU意味着什么?

来自分类Dev

Haskell管道:让接收器根据上游的值返回一个值

来自分类Dev

创建 JMS 接收器的框架

来自分类Dev

方法集(指针与值接收器)

来自分类Dev

Akka流-将接收器连接到源吗?

来自分类Dev

是否有Google Dataflow MongoDB源/接收器?

来自分类Dev

网络摄像头源到EVR接收器

来自分类Dev

USRP_UHD Redhawk的源和接收器

来自分类Dev

Pulseaudio虚拟接收器到虚拟源

来自分类Dev

PulseAudio作为远程源*和*接收器?

来自分类Dev

同时从 Pulseaudio 接收器和源录制

来自分类Dev

如何将插孔音频接收器和插孔音频源设置为默认接收器和源

来自分类Dev

防止接收器接收值关闭立即被调用

来自分类Dev

快速合并接收器接收值内存泄漏

来自分类Dev

指针接收器和值接收器在Golang中是什么意思?

来自分类Dev

Iris框架在实现指针接收器和值接收器方面的差异

来自分类Dev

接收器配置

来自分类Dev

广播接收器

来自分类Dev

接收器配置

来自分类Dev

为广播接收器创建多个AlarmManager?

来自分类Dev

如何在Kotlin中使用接收器创建Lambda

来自分类Dev

如何为Spark Streaming创建MQTT接收器?

来自分类Dev

GCP:无法通过gcloud / API创建项目接收器

来自分类Dev

从IntentService创建的接收器不起作用

来自分类Dev

以编程方式创建静态android广播接收器

来自分类Dev

Swift Combine不会按接收器更新值

来自分类Dev

Android:将值从活动传递到广播接收器

来自分类Dev

Kafka JDBC 接收器不处理空值