我想知道是否有任何逻辑可以确定哪个使用者在同一使用者组中读取一条消息。我有一个主题,一个消费群体。但是,我有一个或多个使用者,因为在生产环境中部署了一个使用者,当我在本地运行我的应用程序时,创建了另一个订阅相同主题的使用者(这是一个测试项目,因此它不是真正的产品,我也不担心数据丢失)。我注意到,有趣的是,本地消费者总是使用任何给定的消息。这样看来,以后创建的使用者优先。
是否可以配置kafka,使较早创建的使用者优先读取?
我的设置包括3个经纪人和1个消费者组ID。此外,此属性 auto.offset.reset
设置为earliest
(将其更改为latest
不能解决问题)。我正在使用该Go库进行kafka。这是我的设置代码:
import (
"log"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func getConfig() *kafka.ConfigMap {
return &kafka.ConfigMap{
"metadata.broker.list": conf.KafkaBrokers,
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "SCRAM-SHA-256",
"sasl.username": conf.KafkaUsername,
"sasl.password": conf.KafkaPassword,
"group.id": conf.KafkaGroupID,
"default.topic.config": kafka.ConfigMap{"auto.offset.reset": "earliest"},
//"debug": "generic,broker,security",
}
}
在使用者组中,每个分区都由一个使用者使用。当消费者加入该组时,其中一个将计算分配,该分配由每个消费者将处理的分区列表组成。
在您的客户端中,可以通过进行配置partition.assignment.strategy
。默认情况下range
遵循Apache Kafka的实现RangeAssignor
。
引用Javadoc:
范围分配器基于每个主题工作。对于每个主题,我们以数字顺序排列可用分区,并以字典顺序排列使用者。然后,将分区数除以使用者总数,以确定分配给每个使用者的分区数。如果它没有均匀划分,那么前几个消费者将有一个额外的划分。
例如,假设有两个使用者C0和C1,两个主题t0和t1,并且每个主题都有3个分区,从而得出分区t0p0,t0p1,t0p2,t1p0,t1p1和t1p2。
作业将是:
C0: [t0p0, t0p1, t1p0, t1p1] C1: [t0p2, t1p2]
消费者通过其在经纪人端生成的会员ID进行订购。它基于使用者client.id
和随机UUID。
实际上,我没关系为每个分区分配哪个消费者,因此我不会过多地关注该部分。相反,重要的是要了解如何分配分区并确定最适合您的用例的策略。
为了完整性,confluent-kafka-go
还支持其他策略,例如:roundrobin
和cooperative-sticky
。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句