私はカフカの消費者クラスをフォローしています。トピックをリストオブジェクトとして渡すことができます。次の記事https://docs.confluent.io/current/clients/java.htmlを参照していますが、コンシューマークラスがトピックにサブスクライブされたら、どのトピックにレコードがあるかを知る必要があります。初期化。調べる方法はありますか?コードは次のとおりです。
public abstract class ConsumeLoop implements Runnable {
private final KafkaConsumer<K, V> consumer;
private final List<String> topics;
private final CountDownLatch shutdownLatch;
public BasicConsumeLoop(KafkaConsumer<K, V> consumer, List<String> topics) {
this.consumer = consumer;
this.topics = topics;
this.shutdownLatch = new CountDownLatch(1);
}
public abstract void process(ConsumerRecord<K, V> record);
public void run() {
try {
consumer.subscribe(topics); --> Consuming list of topics
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE); --> Which topic is returning the records?
records.forEach(record -> process(record));
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
}
からのpartitions()メソッドはConsumerRecords
、次のセットを返しTopicPartition
ます。
パーティション-このレコードセットに含まれるレコードを持つパーティションを取得します。
次に、そのセットを繰り返し処理して、必要に応じてtopic()
名前とpartition()
番号を取得できます。例えば:
for (TopicPartition tp : records.partitions()) {
System.out.println("Got " + records.records(tp).size() + " records "
+ "from topic:partition " + tp.topic() + ":" + tp.partition());
}
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加