トピック名、パーティション番号、オフセットを指定して、トピックから1つのレコードだけを読み取るにはどうすればよいですか?
Sprng Bootベースのアプリケーションでは、ビジネスデータのインポートにKafkaを使用しています。インポートレコードはimport_queueに送信され、1つ以上のビジネスモジュールによって消費されます。次のレコードからのデータインポートを続行するために、コンシューマーがレコードからデータをインポートしなかった場合でも、レコードは常に確認応答されます。
後で、ユーザーは(依存するビジネスデータを修正した後)、1つ以上の失敗した(ただし確認済みの)インポートレコードを再送信することを決定できます。
すべてのレコードのオフセット、パーティション番号、トピック名は、アプリケーションのSQLデータベースの内部に保存されています。
リファレンスドキュメントといくつかのStackOverflowの質問から、次のことを行う必要があることがわかりました。
これは、kafkaトピックから古いレコードを1つだけ読み取る唯一の方法ですか?それとももっと簡単な解決策はありますか?
@Garyによって提案されたように:
ConsumerRecord<byte[], byte[]> read(String topic, int partition, long offset) {
Map<String, Object> configs = Map.of(
"bootstrap.servers", "localhost:9092",
"group.id", "incubator_retry",
"max.poll.records", 1);
DefaultKafkaConsumerFactory<byte[], byte[]> consumerFactory = new DefaultKafkaConsumerFactory<>(
configs, new ByteArrayDeserializer(), new ByteArrayDeserializer());
try (Consumer<byte[], byte[]> consumer = consumerFactory.createConsumer()) {
TopicPartition topicPartition = new TopicPartition(topic, partition);
consumer.assign(List.of(topicPartition));
consumer.seek(topicPartition, offset);
ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(5000));
if (consumerRecords.isEmpty()) {
throw new RuntimeException(String.format("Timeout polling from topic %s partition %d at offset %d",
topicPartition.topic(), topicPartition.partition(), offset));
}
return consumerRecords.iterator().next();
}
}
より簡単な解決策があります。
DefaultConsumerFactory
て作成しますKafkaConsumer
(または単に作成します)group.id
max.poll.records
プロパティを1に設定しますconsumer.assign(...)
目的のトピック/パーティションseek(...)
必要なオフセットまでpoll(...)
あなたが記録を得るまでclose()
消費者(Kafkaデシリアライザーを除く)メッセージ変換を使用している場合は、コンバーターを手動で呼び出す必要があります。
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加