kafka如何确定单个消费者组中的哪个消费者阅读消息?

搭便车的人

我想知道是否有任何逻辑可以确定哪个使用者在同一使用者组中读取一条消息。我有一个主题,一个消费群体。但是,我有一个或多个使用者,因为在生产环境中部署了一个使用者,当我在本地运行我的应用程序时,创建了另一个订阅相同主题的使用者(这是一个测试项目,因此它不是真正的产品,我也不担心数据丢失)。我注意到,有趣的是,本地消费者总是使用任何给定的消息。这样看来,以后创建的使用者优先。

是否可以配置kafka,使较早创建的使用者优先读取?

我的设置包括3个经纪人和1个消费者组ID。此外,此属性 auto.offset.reset设置为earliest(将其更改为latest不能解决问题)。我正在使用该Go进行kafka。这是我的设置代码:

import (
    "log"
    "github.com/confluentinc/confluent-kafka-go/kafka"
)

func getConfig() *kafka.ConfigMap {
    return &kafka.ConfigMap{
        "metadata.broker.list": conf.KafkaBrokers,
        "security.protocol":    "SASL_SSL",
        "sasl.mechanisms":      "SCRAM-SHA-256",
        "sasl.username":        conf.KafkaUsername,
        "sasl.password":        conf.KafkaPassword,
        "group.id":             conf.KafkaGroupID,
        "default.topic.config": kafka.ConfigMap{"auto.offset.reset": "earliest"},
        //"debug":                           "generic,broker,security",
    }
}
米凯尔之家

在使用者组中,每个分区都由一个使用者使用。当消费者加入该组时,其中一个将计算分配,该分配由每个消费者将处理的分区列表组成。

在您的客户端中,可以通过进行配置partition.assignment.strategy默认情况下range遵循Apache Kafka的实现RangeAssignor

引用Javadoc:

范围分配器基于每个主题工作。对于每个主题,我们以数字顺序排列可用分区,并以字典顺序排列使用者。然后,将分区数除以使用者总数,以确定分配给每个使用者的分区数。如果它没有均匀划分,那么前几个消费者将有一个额外的划分。

例如,假设有两个使用者C0和C1,两个主题t0和t1,并且每个主题都有3个分区,从而得出分区t0p0,t0p1,t0p2,t1p0,t1p1和t1p2。

作业将是:

C0: [t0p0, t0p1, t1p0, t1p1]
C1: [t0p2, t1p2]

消费者通过其在经纪人端生成的会员ID进行订购。它基于使用者client.id和随机UUID。

实际上,我没关系为每个分区分配哪个消费者,因此我不会过多地关注该部分。相反,重要的是要了解如何分配分区并确定最适合您的用例的策略。

为了完整性,confluent-kafka-go还支持其他策略,例如:roundrobincooperative-sticky

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

消费者组中的Apache Beam KafkaIO消费者正在阅读相同的消息

来自分类Dev

Kafka 消费者阅读速度太慢

来自分类Dev

在Kafka中,如何让消费者从本地分区消费?

来自分类Dev

消费者将如何阅读已提交的消息?

来自分类Dev

kafka rest的消费者组ACL

来自分类Dev

分析来自 Kafka 消费者的消息

来自分类Dev

如何从特定主题中删除Kafka消费者组?

来自分类Dev

删除Zookeeper中的kafka消费者组

来自分类Dev

在Apache Kafka中延迟使用消费者的消息

来自分类Dev

如果消费者组订阅了多个主题分区,kafka 如何决定先读取哪个?

来自分类Dev

是在消费者组级别还是在该消费者组内的单个消费者管理消费者抵销?

来自分类Dev

Kafka是消息消费者以及流拓扑的同一个消费者组

来自分类Dev

如何扩展Kafka的消费者?

来自分类Dev

如何找到kafka消费者的费率?

来自分类Dev

Tomcat 中的 Kafka 消费者关闭

来自分类Dev

Kafka 中的 LogCompaction 和消费者

来自分类Dev

Kafka消费者组,创建消费者组时设置offset为0

来自分类Dev

Kafka分区到Kafka消费者/消费者组的映射

来自分类Dev

kafka消费者如何选择消费最近的broker?

来自分类Dev

Kafka-spring消费者未阅读邮件

来自分类Dev

Spring Cloud aws 流,消息被消费者组中的多个实例消费

来自分类Dev

Spring Boot Kafka消息消费者和丢失的消息

来自分类Dev

Kafka如何处理运行速度比其他消费者慢的消费者?

来自分类Dev

在生产者/消费者场景中,如何从消费者那里得到回应?

来自分类Dev

kafka 0.10.1使用哪个消费者API?

来自分类Dev

删除未使用的kafka消费者组

来自分类Dev

消费者组没有活跃的成员-Kafka

来自分类Dev

Kafka:删除闲置的消费者组ID

来自分类Dev

ConcurrentKafkaListenerContainerFactory 的 Spring-Kafka 消费者组协调

Related 相关文章

  1. 1

    消费者组中的Apache Beam KafkaIO消费者正在阅读相同的消息

  2. 2

    Kafka 消费者阅读速度太慢

  3. 3

    在Kafka中,如何让消费者从本地分区消费?

  4. 4

    消费者将如何阅读已提交的消息?

  5. 5

    kafka rest的消费者组ACL

  6. 6

    分析来自 Kafka 消费者的消息

  7. 7

    如何从特定主题中删除Kafka消费者组?

  8. 8

    删除Zookeeper中的kafka消费者组

  9. 9

    在Apache Kafka中延迟使用消费者的消息

  10. 10

    如果消费者组订阅了多个主题分区,kafka 如何决定先读取哪个?

  11. 11

    是在消费者组级别还是在该消费者组内的单个消费者管理消费者抵销?

  12. 12

    Kafka是消息消费者以及流拓扑的同一个消费者组

  13. 13

    如何扩展Kafka的消费者?

  14. 14

    如何找到kafka消费者的费率?

  15. 15

    Tomcat 中的 Kafka 消费者关闭

  16. 16

    Kafka 中的 LogCompaction 和消费者

  17. 17

    Kafka消费者组,创建消费者组时设置offset为0

  18. 18

    Kafka分区到Kafka消费者/消费者组的映射

  19. 19

    kafka消费者如何选择消费最近的broker?

  20. 20

    Kafka-spring消费者未阅读邮件

  21. 21

    Spring Cloud aws 流,消息被消费者组中的多个实例消费

  22. 22

    Spring Boot Kafka消息消费者和丢失的消息

  23. 23

    Kafka如何处理运行速度比其他消费者慢的消费者?

  24. 24

    在生产者/消费者场景中,如何从消费者那里得到回应?

  25. 25

    kafka 0.10.1使用哪个消费者API?

  26. 26

    删除未使用的kafka消费者组

  27. 27

    消费者组没有活跃的成员-Kafka

  28. 28

    Kafka:删除闲置的消费者组ID

  29. 29

    ConcurrentKafkaListenerContainerFactory 的 Spring-Kafka 消费者组协调

热门标签

归档