我正在寻找一个可以执行以下操作的函数:
merge :: MonadIO m => [Producer m a] -> Producer m a
我快速浏览了一下stm-conduit
,它看起来很相似,但是不确定是否符合我的要求:
messagesSource :: MonadIO m => AmqpConn -> Ack -> Text -> Producer m (Message, Envelope)
messagesSource conn ack q = loop
where
loop = do
mmsg <- liftIO $ getMsg chan ack q
case mmsg of
Just (m, e) -> do
yield (m, e)
liftIO $ ackMsg chan (envDeliveryTag e) False
loop
Nothing -> loop
chan = fst $ amqpChan conn
如您所见,该管道生产者在产生消息后会对其进行确认。在简单的“单线程”管道中,它运行良好,该消息进入接收器,然后被确认。
但是,stm-conduit
这种情况可能会改变,因为据我所知,生产者不会等待消息接收器使用该消息,而是会并行工作,并且消息可能会过早地被确认。
我的理解stm-conduit
正确吗?
将单独的源合并为一个具有良好的单流语义的方法是什么?
更新:根据要求将代码更新为实际工作的AMQP示例(但是可能有点吵)。
更新2:我认为我所追求的可能是管道源的另类实例,因此我可以做类似的事情let src = src1 <|> src2
。有可能吗?
mergeSources
在stm-conduit
保持TBMChannel
幕后。您所有的来源/生产者都首先连接到TBMChannel
,然后它将创建一个单一的来源,尝试从通道FIFO中提取值。
您可以设置绑定,中间的TBMChannel
用时mergeSources
。假设您将bound设置为n,那么所有Source产生的前n个值将被转储到theTBMChannel
和the AmqpConn
now,假设它在AmqpConn
末尾没有被阻塞,并且您的使用者比source慢(BTWAmqpConn
使用无界,Control.Concurrent.Chan
因此它赢得了不会阻止)。此后,TBMChannel已满,因此将阻止试图为该通道产生值的源。您的消费者从合并来源中一个一个地获取价值,因此它在前n个元素之后是顺序的。
为了确保它从一开始就是连续的,可以将边界设置为1,但是这可能会导致一些性能问题。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句