我正在构建一个系统,其中两个不同的实体需要处理来自同一源的消息(以不同的方式 - 例如一个将记录所有消息,而另一个实体可能想要聚合数据)。
理想情况下,每个实体对于性能和弹性都是完全可扩展的,因此我们有多个发布者、多个日志订阅者和多个聚合订阅者,但每个发布生成的每条消息仍然由一个日志订阅者和一个聚合订阅者处理。
使用 AMQP,我们可以通过发布到扇出交换器来实现这一点,该交换器将消息分发到两个队列,其中每个队列都有许多订阅者。我知道在 NATS 中可以实现相同的行为,只需让所有订阅者根据他们的角色使用两个不同的“队列组名称”来收听同一个“主题”。
在这种情况下,发往主题的消息将从每个队列组传递给一个订阅者,即每条消息将精确传递 n 次,n 是不同队列组的数量而不是订阅者的数量。这样对吗?
实际上,您可以使用队列订阅者(例如在 Go 中,它是这样的 API:func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription, error)
.
该queue
是组名。例如,它可能在您的示例log
和aggregation
. 您可以在这些组中的每一个上创建尽可能多的队列订阅者,并且每个组中只有 1 个成员会收到给定的消息。
例如,假设您发布关于主题的消息,foo
并且您有 10 个队列订阅者foo
使用 queue namelog
和 10 个队列订阅者foo
使用 queue name aggregation
。消息将传递给 2 个订阅者,1 个用于 group log
,1 个用于 group aggregation
。
希望这可以帮助。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句