我正在Spring Integration中进行项目,但遇到了很大的问题。流程中有一些过滤组件,稍后在流程中我有一个聚合元素。
问题是过滤组件不支持“应用顺序”属性。它在不修改原始序列号的情况下过滤掉一些记录,但是减少了消息数。在流程的稍后部分,我需要一个聚合,由于某些消息被滤除,因此无法释放元素。
我不想使用任何具有apply-sequence属性的特殊路由元素。您能建议我解决此类过滤问题的任何常见解决方案吗?
谢谢,
我会说您误解了filter
和的行为aggregator
。
我猜您apply-sequence
上游有一些感知组件。因此,该组中的所有消息都接受多个标头-correlationId
在默认情况下 将消息分组aggregator
;sequenceNumber
-index
消息的;sequenceSize
-组中的邮件数。
Filter
只是检查消息是否存在某种情况,然后将其发送到“outpu-channel
或”discard
逻辑。它不会修改消息。但是,即使我们可以做到,但这听起来也不是很好。
假设组中只有两条消息。首先可以进行过滤-我们只需将其发送到即可aggregator
。但是第二个被丢弃了,是的,它不会被发送到聚合器。最后一个永远不会释放该组,因为sequenceSize
没有达到。
为了克服你的要求,你需要有一些自定义ReleaseStrategy
的aggregator
(默认情况下它是SequenceSizeReleaseStrategy
)。例如,要检查系统中的某些状态,以确保该组中的所有消息均独立于发送true
或在发送false
之后发送filter
。或fake
出于相同原因收到一些消息,然后在组中检查其可用性。
在这种情况下,您只需要注意correlationId
在聚合器中对消息进行分组即可。
更新
对于这种情况,建议的发布策略是什么?使用超时作为发布策略是一个好策略吗?
我可以说,有时候对于某些集成方案来说很难找到好的解决方案。消息传递是stateless
天生的,因此要关联和分组数量不确定的消息可能是个问题。
需要查看需求和环境。
例如,当所有消息都在单个线程中处理时,您可以安全地将一些fake
标记消息直接直接发送到aggregator
并从中进行检查ReleaseStrategy
。即使该群组中的所有消息都可能被丢弃,它也将起作用。
如果您并行处理这些消息,或者它们是从不同的线程接收的,则您实际上将无法确定消息的顺序和每个进程的时间。
在这种情况下,TimeoutCountSequenceSizeReleaseStrategy
确实可以提供帮助。当然,需要根据您的系统要求找到合适的时间表。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句