カフカのメッセージを読んでデータベースに保存するという簡単な要件があります。春のカフカをバッチリスナーモードで使用しています。私は春のカフカのドキュメントを調べましたが、バッチリスナーモードで春のカフカを使用すると、バッチモードでdbトランザクションをコミットし、障害が発生した場合に完全なトランザクションがロールバックされることはまだ明確ではありませんか?
失敗した場合、同じレコードのセットを再度探しますか?
以下の設定があります、
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigProperties.getBootstrapservers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfigProperties.getConsumer().getGroupid());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaConfigProperties.getConsumer().getOffset());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,250);
props.put(ApplicationConstant.KAFKA_SCHEMA_URL_PROPERTY, kafkaConfigProperties.getSchemaregistry());
@Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericRecord> kafkaListenerContainerFactory(KafkaConfigProperties kafkaConfigProperties) {
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(kafkaConfigProperties));
factory.setConcurrency(2);
factory.setBatchListener(true);
ContainerProperties containerProperties = factory.getContainerProperties();
containerProperties.setAckOnError(false);
containerProperties.setAckMode(AckMode.BATCH);
return factory;
}
SeekToCurrentBatchErrorHandler
またはを追加してRecoveringBatchErrorHander
、バッチを再生する必要があります。これは、バージョン2.5以降のデフォルトのエラーハンドラです。
ドキュメントを参照してください。
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加