将导管合并为一个

阿列克谢·拉加(Alexey Raga)

我正在寻找一个可以执行以下操作的函数:

merge :: MonadIO m => [Producer m a] -> Producer m a

我快速浏览了一下stm-conduit,它看起来很相似,但是不确定是否符合我的要求:

messagesSource :: MonadIO m => AmqpConn -> Ack -> Text -> Producer m (Message, Envelope)
messagesSource conn ack q = loop
  where
    loop = do
      mmsg <- liftIO $ getMsg chan ack q
      case mmsg of
        Just (m, e) -> do
          yield (m, e)
          liftIO $ ackMsg chan (envDeliveryTag e) False
          loop
        Nothing     -> loop
    chan = fst $ amqpChan conn

如您所见,该管道生产者在产生消息后会对其进行确认。在简单的“单线程”管道中,它运行良好,该消息进入接收器,然后被确认。

但是,stm-conduit这种情况可能会改变,因为据我所知,生产者不会等待消息接收器使用该消息,而是会并行工作,并且消息可能会过早地被确认。

我的理解stm-conduit正确吗?
将单独的源合并为一个具有良好的单流语义的方法是什么?

更新:根据要求将代码更新为实际工作的AMQP示例(但是可能有点吵)。

更新2:我认为我所追求的可能是管道源的另类实例,因此我可以做类似的事情let src = src1 <|> src2有可能吗?

Zakyggaps

mergeSourcesstm-conduit保持TBMChannel幕后。您所有的来源/生产者都首先连接到TBMChannel,然后它将创建一个单一的来源,尝试从通道FIFO中提取值。

您可以设置绑定,中间的TBMChannel用时mergeSources假设您将bound设置为n,那么所有Source产生的前n个值将被转储到theTBMChannel和the AmqpConnnow,假设它在AmqpConn末尾没有被阻塞,并且您的使用者比source慢(BTWAmqpConn使用无界,Control.Concurrent.Chan因此它赢得了不会阻止)。此后,TBMChannel已满,因此将阻止试图为该通道产生值的源。您的消费者从合并来源中一个一个地获取价值,因此它在前n个元素之后是顺序的。

为了确保它从一开始就是连续的,可以将边界设置为1,但是这可能会导致一些性能问题。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章