Spring Kafkaバッチリスナーはバッチモードでdbトランザクションをコミットしますか?失敗した場合、トランザクション全体がロールバックされますか?

リテッシュ:

カフカのメッセージを読んでデータベースに保存するという簡単な要件があります。春のカフカをバッチリスナーモードで使用しています。私は春のカフカのドキュメントを調べましたが、バッチリスナーモードで春のカフカを使用すると、バッチモードでdbトランザクションをコミットし、障害が発生した場合に完全なトランザクションがロールバックされることはまだ明確ではありませんか?

失敗した場合、同じレコードのセットを再度探しますか?

以下の設定があります、

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigProperties.getBootstrapservers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfigProperties.getConsumer().getGroupid());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaConfigProperties.getConsumer().getOffset());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,250);
props.put(ApplicationConstant.KAFKA_SCHEMA_URL_PROPERTY, kafkaConfigProperties.getSchemaregistry());
@Bean
    public ConcurrentKafkaListenerContainerFactory<String, GenericRecord> kafkaListenerContainerFactory(KafkaConfigProperties kafkaConfigProperties) {
        ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        
        factory.setConsumerFactory(consumerFactory(kafkaConfigProperties));
        factory.setConcurrency(2);
        factory.setBatchListener(true);
        ContainerProperties containerProperties = factory.getContainerProperties();
        containerProperties.setAckOnError(false);
        containerProperties.setAckMode(AckMode.BATCH);
        return factory;
    }
ゲイリーラッセル:

SeekToCurrentBatchErrorHandlerまたはを追加してRecoveringBatchErrorHander、バッチを再生する必要があります。これは、バージョン2.5以降のデフォルトのエラーハンドラです。

ドキュメントを参照してください

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

Related 関連記事

ホットタグ

アーカイブ