Kafka Java API偏移量操作说明

码头

我正在尝试使用最新的kafka_2.10-0.8.2.1使用低级的Consumer Java API手动管理偏移量。为了验证我从Kafka提交/读取的偏移量是正确的,我使用kafka.tools.ConsumerOffsetChecker工具。

这是我的主题/消费者组的输出示例:

./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group   elastic_search_group --zookeeper localhost:2181 --topic my_log_topic
Group           Topic                          Pid Offset          logSize         Lag             Owner
elastic_search_group my_log_topic              0   5               29              24              none

  这是我对结果的解释:

偏移= 5->这是我的'elastic_search_group'使用者当前的偏移量

logSize = 29->这是最新的偏移量-即将到达此主题/分区的下一条消息的偏移量

滞后= 24-> 29-5-我的'elastic_search_group'使用者尚未处理多少消息

PID-分区ID

Q1:这是正确的吗?

现在,我想从Java使用者那里获得相同的信息。在这里,我发现必须使用两种不同的API:

kafka.javaapi。OffsetRequest以获取最早和最新的偏移量,但可以获取kafka.javaapi。OffsetFetchRequest获取当前偏移量。

要获得最早(或最新)补偿,我要做:

TopicAndPartition topicAndPartition = new TopicAndPartition(myTopic, myPartition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.EarliestTime(), 1));
// OR for Latest: requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = simpleConsumer.getOffsetsBefore(request);
long[] offsets = response.offsets(topic, partition);
long myEarliestOffset = offsets[0];
// OR for Latest: long myLatestOffset = offsets[0];

为了获得当前的偏移量,我必须使用完全不同的API:

short versionID = 0;
int correlationId = 0;
List<TopicAndPartition> topicPartitionList = new ArrayList<TopicAndPartition>(); 
TopicAndPartition myTopicAndPartition = new TopicAndPartition(myTopic, myPartition);
topicPartitionList.add(myTopicAndPartition); 
OffsetFetchRequest offsetFetchReq = new OffsetFetchRequest(
kafkaGroupId, topicPartitionList, versionID, correlationId, kafkaGroupId);
OffsetFetchResponse offsetFetchResponse = simpleConsumer.fetchOffsets(offsetFetchReq);
long currentOffset = offsetFetchResponse.offsets().get(myTopicAndPartition).offset();

Q2:正确吗?为什么会有两个不同的API来获得非常相似的信息?

问题3:我在这里使用哪个versionId和relatedId无关紧要?我虽然versionId对于0.8.2.1之前的kafka应该为0,对于0.8.2.1及更高版本为1,但似乎对于0.8.2.1也适用于0-见下文?

因此,对于上面主题的示例状态以及上面的ConsumerOffsetChecker输出,这是我从Java代码中获得的内容:

currentOffset = 5; earlyestOffset = 29; LatestOffset = 29

'currentOffset'似乎还可以,'latestOffset'也是正确的,但是'earliestOffset'是吗?我希望它至少为“ 5”?

Q4:最早的Offset高于currentOffset怎么可能?我唯一的怀疑是,由于保留政策,该主题中的消息可能已被清除……。还有其他情况吗?

阴影88

我正在寻找在分区中查找滞后的方法。这涉及您已采取的相同步骤。到目前为止,无论我学到什么,我都能给您答案。

  1. logSize直接指向该特定分区中已累积了多少消息。或者,它指定该分区中邮件的最大偏移量。偏移量是最后一次成功使用的消息的偏移量。因此,滞后只是对数大小和偏移量之间的差异。
  2. 是的,这是正确的。到目前为止,这是找到当前偏移量和最早或最新偏移量的仅有两种方法
  3. 我不知道为什么需要指定versionId。您可以kafka.api.OffsetRequest.CurrentVersion()用来获取versionId。因此可以避免硬编码。您可以放心地将correlationId设为0。
  4. 这很奇怪。当我使用EarliestTime()时,即使我的当前偏移量有了很大的进步,我也将最早的偏移量设为0。这意味着它是分区的开始。因此,当某些消息在将来的某个时间到期时,最早的偏移量将是一些非零的数字。现在,如果由于保留策略而导致消息被清除,则应该更改滞后时间。我不确定这种行为。可以确定的一种方法是,在注意到此类读取并检查日志后运行使用者。它应该显示这样的行。

    2015-06-09 18:49:15 ::调试:: PartitionTopicInfo:52 ::重置请求的消耗偏移量:2:获取的偏移量= 405952:消耗的偏移量= 335372至335372 2015-06-09 18:49:15: :调试:: PartitionTopicInfo:52 ::重置请求的消耗偏移量:2:获取的偏移量= 405952:消耗的偏移量= 335373至335373

请注意,在上面的日志行中,提取的偏移量保持不变,消耗的偏移量正在增加。最终它将以

2015-06-09 18:49:16 ::调试:: PartitionTopicInfo:52 ::重置请求的消耗偏移量:2:获取的偏移量= 405952:消耗的偏移量= 405952至405952

则这意味着由于日志保留策略从335372偏移到405952而已过期

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

Kafka Connect偏移量。获取/设置?

来自分类Dev

Kafka-偏移量和logSize

来自分类Dev

Kafka分区偏移量分布不均

来自分类Dev

Apache Kafka偏移量是如何生成的?

来自分类Dev

使用高级API从特定的偏移量开始读取kafka消息

来自分类Dev

Kafka:消费者api:无法使用kafka-consumer-api从偏移量手动读取和确认

来自分类Dev

偏移量存储为Kafka时如何检查消费者偏移量?

来自分类Dev

解释Kafka中的复制偏移量检查点和恢复点偏移量

来自分类Dev

kafka +开始偏移量比之前的最后一个偏移量高得多

来自分类Dev

给定偏移量列表,从特定的Kafka分区读取偏移量的最快方法是什么?

来自分类Dev

Kafka Consumer偏移量提交检查以避免提交较小的偏移量

来自分类Dev

kafka是否可以区分消耗的偏移量和提交的偏移量?

来自分类Dev

如何在Spring Kafka中禁用提交偏移量以在本地存储偏移量?

来自分类Dev

如何获取kafka主题分区的最后/结束偏移量?

来自分类Dev

kafka:一个主题的不同偏移量

来自分类Dev

如何获取kafka主题分区的最新偏移量?

来自分类Dev

Kafka Connect-无法提交偏移量和刷新

来自分类Dev

在kafka分区中压缩的偏移量会怎样?

来自分类Dev

Flink(Kafka源代码)如何管理偏移量?

来自分类Dev

手动设置 kafka 组 id 的偏移量

来自分类Dev

如何使用 Spring for Kafka 寻找时间戳的偏移量

来自分类Dev

Spring kafka 不重试未提交的偏移量

来自分类Dev

java filereader以偏移量读取

来自分类Dev

Java中FileRead的偏移量> 32

来自分类Dev

主题的偏移量如何在Kafka(Kafka_net)中工作

来自分类Dev

在Kafka中,如何在两个偏移量之间重播kafka使用者?

来自分类Dev

Kafka ACL - 设置 ACL 后无法在 Kafka 工具中描述组或查看偏移量

来自分类Dev

了解Java Kafka Consumer API

来自分类Dev

为什么Java String实现中存在偏移量?

Related 相关文章

  1. 1

    Kafka Connect偏移量。获取/设置?

  2. 2

    Kafka-偏移量和logSize

  3. 3

    Kafka分区偏移量分布不均

  4. 4

    Apache Kafka偏移量是如何生成的?

  5. 5

    使用高级API从特定的偏移量开始读取kafka消息

  6. 6

    Kafka:消费者api:无法使用kafka-consumer-api从偏移量手动读取和确认

  7. 7

    偏移量存储为Kafka时如何检查消费者偏移量?

  8. 8

    解释Kafka中的复制偏移量检查点和恢复点偏移量

  9. 9

    kafka +开始偏移量比之前的最后一个偏移量高得多

  10. 10

    给定偏移量列表,从特定的Kafka分区读取偏移量的最快方法是什么?

  11. 11

    Kafka Consumer偏移量提交检查以避免提交较小的偏移量

  12. 12

    kafka是否可以区分消耗的偏移量和提交的偏移量?

  13. 13

    如何在Spring Kafka中禁用提交偏移量以在本地存储偏移量?

  14. 14

    如何获取kafka主题分区的最后/结束偏移量?

  15. 15

    kafka:一个主题的不同偏移量

  16. 16

    如何获取kafka主题分区的最新偏移量?

  17. 17

    Kafka Connect-无法提交偏移量和刷新

  18. 18

    在kafka分区中压缩的偏移量会怎样?

  19. 19

    Flink(Kafka源代码)如何管理偏移量?

  20. 20

    手动设置 kafka 组 id 的偏移量

  21. 21

    如何使用 Spring for Kafka 寻找时间戳的偏移量

  22. 22

    Spring kafka 不重试未提交的偏移量

  23. 23

    java filereader以偏移量读取

  24. 24

    Java中FileRead的偏移量> 32

  25. 25

    主题的偏移量如何在Kafka(Kafka_net)中工作

  26. 26

    在Kafka中,如何在两个偏移量之间重播kafka使用者?

  27. 27

    Kafka ACL - 设置 ACL 后无法在 Kafka 工具中描述组或查看偏移量

  28. 28

    了解Java Kafka Consumer API

  29. 29

    为什么Java String实现中存在偏移量?

热门标签

归档