我正在尝试使用最新的kafka_2.10-0.8.2.1使用低级的Consumer Java API手动管理偏移量。为了验证我从Kafka提交/读取的偏移量是正确的,我使用kafka.tools.ConsumerOffsetChecker工具。
这是我的主题/消费者组的输出示例:
./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group elastic_search_group --zookeeper localhost:2181 --topic my_log_topic
Group Topic Pid Offset logSize Lag Owner
elastic_search_group my_log_topic 0 5 29 24 none
这是我对结果的解释:
偏移= 5->这是我的'elastic_search_group'使用者当前的偏移量
logSize = 29->这是最新的偏移量-即将到达此主题/分区的下一条消息的偏移量
滞后= 24-> 29-5-我的'elastic_search_group'使用者尚未处理多少消息
PID-分区ID
Q1:这是正确的吗?
现在,我想从Java使用者那里获得相同的信息。在这里,我发现必须使用两种不同的API:
kafka.javaapi。OffsetRequest以获取最早和最新的偏移量,但可以获取kafka.javaapi。OffsetFetchRequest获取当前偏移量。
要获得最早(或最新)补偿,我要做:
TopicAndPartition topicAndPartition = new TopicAndPartition(myTopic, myPartition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.EarliestTime(), 1));
// OR for Latest: requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = simpleConsumer.getOffsetsBefore(request);
long[] offsets = response.offsets(topic, partition);
long myEarliestOffset = offsets[0];
// OR for Latest: long myLatestOffset = offsets[0];
为了获得当前的偏移量,我必须使用完全不同的API:
short versionID = 0;
int correlationId = 0;
List<TopicAndPartition> topicPartitionList = new ArrayList<TopicAndPartition>();
TopicAndPartition myTopicAndPartition = new TopicAndPartition(myTopic, myPartition);
topicPartitionList.add(myTopicAndPartition);
OffsetFetchRequest offsetFetchReq = new OffsetFetchRequest(
kafkaGroupId, topicPartitionList, versionID, correlationId, kafkaGroupId);
OffsetFetchResponse offsetFetchResponse = simpleConsumer.fetchOffsets(offsetFetchReq);
long currentOffset = offsetFetchResponse.offsets().get(myTopicAndPartition).offset();
Q2:正确吗?为什么会有两个不同的API来获得非常相似的信息?
问题3:我在这里使用哪个versionId和relatedId无关紧要?我虽然versionId对于0.8.2.1之前的kafka应该为0,对于0.8.2.1及更高版本为1,但似乎对于0.8.2.1也适用于0-见下文?
因此,对于上面主题的示例状态以及上面的ConsumerOffsetChecker输出,这是我从Java代码中获得的内容:
currentOffset = 5; earlyestOffset = 29; LatestOffset = 29
'currentOffset'似乎还可以,'latestOffset'也是正确的,但是'earliestOffset'是吗?我希望它至少为“ 5”?
Q4:最早的Offset高于currentOffset怎么可能?我唯一的怀疑是,由于保留政策,该主题中的消息可能已被清除……。还有其他情况吗?
我正在寻找在分区中查找滞后的方法。这涉及您已采取的相同步骤。到目前为止,无论我学到什么,我都能给您答案。
kafka.api.OffsetRequest.CurrentVersion()
用来获取versionId。因此可以避免硬编码。您可以放心地将correlationId设为0。这很奇怪。当我使用EarliestTime()时,即使我的当前偏移量有了很大的进步,我也将最早的偏移量设为0。这意味着它是分区的开始。因此,当某些消息在将来的某个时间到期时,最早的偏移量将是一些非零的数字。现在,如果由于保留策略而导致消息被清除,则应该更改滞后时间。我不确定这种行为。可以确定的一种方法是,在注意到此类读取并检查日志后运行使用者。它应该显示这样的行。
2015-06-09 18:49:15 ::调试:: PartitionTopicInfo:52 ::重置请求的消耗偏移量:2:获取的偏移量= 405952:消耗的偏移量= 335372至335372 2015-06-09 18:49:15: :调试:: PartitionTopicInfo:52 ::重置请求的消耗偏移量:2:获取的偏移量= 405952:消耗的偏移量= 335373至335373
请注意,在上面的日志行中,提取的偏移量保持不变,消耗的偏移量正在增加。最终它将以
2015-06-09 18:49:16 ::调试:: PartitionTopicInfo:52 ::重置请求的消耗偏移量:2:获取的偏移量= 405952:消耗的偏移量= 405952至405952
则这意味着由于日志保留策略从335372偏移到405952而已过期
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句