我写了一个节点卡夫卡消费者。在极少数情况下,我会使用一个组ID启动kafka客户端,该组ID在某些偏移量可用时可用,但现在不再可用-导致调用“ offsetOutOfRange”事件。在这种情况下,建议的行为是什么?记录错误并退出?有办法恢复吗?我一直想从上次提交的偏移量(如果存在且可用)运行zookeeper。
client = new kafka.Client(ZOOKEEPER_URLS),
consumer = new Consumer(client, [], {
groupId: GROUP_ID,
fromOffset: true
});
consumer.on('offsetOutOfRange', function (topic) {
applicationLogger.error('Kafka consumer is trying to read from offset which is out of range', topic);
process.exit(1);
});
我想知道为什么在node-kafka-consumer中未实现此功能,但是在其他客户端中处理偏移超出范围错误的默认行为是发出OffsetRequest以获取最早或最新的可用偏移,然后将使用者偏移设置为a新价值并继续获取。
这是完全可恢复的情况,您只需要指定要恢复到的偏移量-最早或最新可用。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句