如何通过 KafkaConsumer 可靠地获取所有 kafka 主题消息

扎克·麦康伯

getMessages()下面方法有时会获取 kafka 主题的所有消息。此代码在页面加载时在 Web 应用程序中执行。有时没有消息返回,有时所有消息都返回。

有没有办法设置属性和/或更改代码,以便每次都返回所有消息?

public List<String> getMessages() {
    List<String> messages = new ArrayList<>();
    try {
        ConnectionKafka connection = ConstantsHome.connectionManager.getConnectionDef(getGuid(), ConnectionKafka.class);
        Properties props = new Properties();
        props.put("bootstrap.servers", connection.getProps().get("bootstrapServers"));
        props.put("group.id", getName());
        props.put("auto.offset.reset", "earliest");
        props.put("enable.auto.commit", "true");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singleton(getName()));
        consumer.poll(0);
        consumer.seekToBeginning(consumer.assignment());
        ConsumerRecords<String, String> records = consumer.poll(0);
        for (ConsumerRecord<String, String> record : records) {
            messages.add(
                String.format("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value())
            );
        }
        consumer.close(0, TimeUnit.MILLISECONDS);
    } catch (Exception e) {
        Utils.writeToLog(e, getClass().getName(), "", IErrorManager.ERROR);
    }
    Collections.sort(messages, new Comparator<String>() {
        @Override
        public int compare(String o1, String o2) {
            return Integer.valueOf(o1.substring("offset = ".length(), o1.indexOf(","))) -
            Integer.valueOf(o2.substring("offset = ".length(), o2.indexOf(",")));
        }
    });
    return messages;
}
赫拉戈斯

如果您期望获得每次通话的所有消息,您应该正确设置以下内容

enable.auto.commit = false

另一个选项是为每次迭代创建一个动态组 ID,考虑到组元数据存储在 kafka 端,我会避免使用此选项。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

如何从kafka服务器获取主题中的所有消息

来自分类Dev

在从 Kafka 主题消费了所有可用消息后,如何返回包含消息列表的未来?

来自分类Dev

从Spark中的Kafka消息获取主题

来自分类Dev

如何从具有特定偏移量的kafka主题获取消息

来自分类Dev

使用springboot在KafkaConsumer中反序列化kafka消息

来自分类Dev

Kafka 0.9如何使用KafkaConsumer手动提交偏移量时如何重新使用消息

来自分类Dev

Node.js:如何使用kafka-node从主题开始获取消息?

来自分类Dev

不挂起就无法获取 KafkaConsumer 消息

来自分类Dev

如何为具有 1 个分区的 Kafka 主题创建并发消息侦听器

来自分类Dev

Kafka和clojure没有KafkaConsumer对象的订阅方法

来自分类Dev

如何从Kafka主题中检索最新消息

来自分类Dev

如何通过Java在Kafka中创建主题

来自分类Dev

Spring Kafka Client 无法从启用 Kerberos 的 Kafka Broker 主题获取消息

来自分类Dev

如何可靠地确定最近7天未使用的所有文件夹?

来自分类Dev

如何可靠地确定最近7天未使用的所有文件夹?

来自分类Dev

如何可靠地阻止Windows 10防火墙中的所有传入连接?

来自分类Dev

消费者为什么重启后会从Kafka主题中读取所有消息?

来自分类Dev

Apache Kafka 0.9 Java API消耗主题开头的所有消息

来自分类Dev

Kafka Producer Java API未将消息分发到所有主题分区

来自分类Dev

通过网络API可靠地获取县名

来自分类Dev

在kafka主题中创建多个分区,并使用kafka-node将消息发布到所有分区

来自分类Dev

如何将消息从一个kafka主题复制到bash的另一个主题?

来自分类Dev

kafka 如何使用主题/分区/偏移量实现恰好一次的消息传递逻辑

来自分类Dev

有没有办法确定消息来自Kafka主题?

来自分类Dev

Kafka使用者不使用现有主题中的消息

来自分类Dev

当有消息写入Kafka主题时,FlinkKafakProducer的numRecordsOut始终显示为0

来自分类Dev

Kafka 0.10 Java 消费者没有从主题中读取消息

来自分类Dev

有没有办法消耗来自 kafka 主题的所有消息并在此之后停止轮询?

来自分类Dev

将UDP消息推送到Kafka主题

Related 相关文章

  1. 1

    如何从kafka服务器获取主题中的所有消息

  2. 2

    在从 Kafka 主题消费了所有可用消息后,如何返回包含消息列表的未来?

  3. 3

    从Spark中的Kafka消息获取主题

  4. 4

    如何从具有特定偏移量的kafka主题获取消息

  5. 5

    使用springboot在KafkaConsumer中反序列化kafka消息

  6. 6

    Kafka 0.9如何使用KafkaConsumer手动提交偏移量时如何重新使用消息

  7. 7

    Node.js:如何使用kafka-node从主题开始获取消息?

  8. 8

    不挂起就无法获取 KafkaConsumer 消息

  9. 9

    如何为具有 1 个分区的 Kafka 主题创建并发消息侦听器

  10. 10

    Kafka和clojure没有KafkaConsumer对象的订阅方法

  11. 11

    如何从Kafka主题中检索最新消息

  12. 12

    如何通过Java在Kafka中创建主题

  13. 13

    Spring Kafka Client 无法从启用 Kerberos 的 Kafka Broker 主题获取消息

  14. 14

    如何可靠地确定最近7天未使用的所有文件夹?

  15. 15

    如何可靠地确定最近7天未使用的所有文件夹?

  16. 16

    如何可靠地阻止Windows 10防火墙中的所有传入连接?

  17. 17

    消费者为什么重启后会从Kafka主题中读取所有消息?

  18. 18

    Apache Kafka 0.9 Java API消耗主题开头的所有消息

  19. 19

    Kafka Producer Java API未将消息分发到所有主题分区

  20. 20

    通过网络API可靠地获取县名

  21. 21

    在kafka主题中创建多个分区,并使用kafka-node将消息发布到所有分区

  22. 22

    如何将消息从一个kafka主题复制到bash的另一个主题?

  23. 23

    kafka 如何使用主题/分区/偏移量实现恰好一次的消息传递逻辑

  24. 24

    有没有办法确定消息来自Kafka主题?

  25. 25

    Kafka使用者不使用现有主题中的消息

  26. 26

    当有消息写入Kafka主题时,FlinkKafakProducer的numRecordsOut始终显示为0

  27. 27

    Kafka 0.10 Java 消费者没有从主题中读取消息

  28. 28

    有没有办法消耗来自 kafka 主题的所有消息并在此之后停止轮询?

  29. 29

    将UDP消息推送到Kafka主题

热门标签

归档