我刚刚开始使用spring-kafka 2.6.4。我创建了消费者工厂,该工厂按批轮询消息:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory(MeterRegistry meterRegistry) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(kafkaProperties.getTopicConcurrency());
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
return factory;
}
现在,我想定义适当的错误处理程序,该处理程序使使用者卡在失败的记录上,而不轮询下一批。
我应该使用哪个错误处理程序?
谢谢。
的RecoveringBatchErrorHandler
(https://docs.spring.io/spring-kafka/docs/current/reference/html/#recovering-batch-eh)现在是优选的(和默认自2.5)批次错误处理程序。您的侦听器可以引发特定异常,以指示批处理中的哪个记录失败。
还有一个RetryingBatchErrorHandler
(https://docs.spring.io/spring-kafka/docs/current/reference/html/#retrying-batch-eh)。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句