spring-kafkaを使用して、特定のオフセットで特定のトピックとパーティションから古いkafkaメッセージを再送信(読み取り)する方法は?

フロール

トピック名、パーティション番号、オフセットを指定して、トピックから1つのレコードだけを読み取るにはどうすればよいですか?

Sprng Bootベースのアプリケーションでは、ビジネスデータのインポートにKafkaを使用しています。インポートレコードはimport_queueに送信され、1つ以上のビジネスモジュールによって消費されます。次のレコードからのデータインポートを続行するために、コンシューマーがレコードからデータをインポートしなかった場合でも、レコードは常に確認応答されます。

後で、ユーザーは(依存するビジネスデータを修正した後)、1つ以上の失敗した(ただし確認済みの)インポートレコードを再送信することを決定できます。

すべてのレコードのオフセット、パーティション番号、トピック名は、アプリケーションのSQLデータベースの内部に保存されています。

リファレンスドキュメントといくつかのStackOverflowの質問から、次のことを行う必要があることがわかりました。

  1. コンテナのセットアップ(消費者/リスナー)
  2. 希望のオフセットに巻き戻す(シークする)
  3. 1つのレコードを読む
  4. 残りのレコードの読み取りをスキップする

これは、kafkaトピックから古いレコードを1つだけ読み取る唯一の方法ですか?それとももっと簡単な解決策はありますか?

解決

@Garyによって提案されたように:

ConsumerRecord<byte[], byte[]> read(String topic, int partition, long offset) {
    Map<String, Object> configs = Map.of(
            "bootstrap.servers", "localhost:9092",
            "group.id", "incubator_retry",
            "max.poll.records", 1);
    DefaultKafkaConsumerFactory<byte[], byte[]> consumerFactory = new DefaultKafkaConsumerFactory<>(
            configs, new ByteArrayDeserializer(), new ByteArrayDeserializer());

    try (Consumer<byte[], byte[]> consumer = consumerFactory.createConsumer()) {
        TopicPartition topicPartition = new TopicPartition(topic, partition);
        consumer.assign(List.of(topicPartition));
        consumer.seek(topicPartition, offset);
        ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(5000));
        if (consumerRecords.isEmpty()) {
            throw new RuntimeException(String.format("Timeout polling from topic %s partition %d at offset %d",
                    topicPartition.topic(), topicPartition.partition(), offset));
        }
        return consumerRecords.iterator().next();
    }
}
ゲーリーラッセル

より簡単な解決策があります。

  • を使用しDefaultConsumerFactoryて作成しますKafkaConsumer(または単に作成します)
  • 別のものを使用する group.id
  • max.poll.recordsプロパティを1に設定します
  • consumer.assign(...) 目的のトピック/パーティション
  • seek(...) 必要なオフセットまで
  • poll(...) あなたが記録を得るまで
  • close() 消費者

(Kafkaデシリアライザーを除く)メッセージ変換を使用している場合は、コンバーターを手動で呼び出す必要があります。

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

Related 関連記事

ホットタグ

アーカイブ