Kafkaの消費者に記録があるトピックを特定する

サイイド

私はカフカの消費者クラスをフォローしています。トピックをリストオブジェクトとして渡すことができます。次の記事https://docs.confluent.io/current/clients/java.htmlを参照していますが、コンシューマークラスがトピックにサブスクライブされたら、どのトピックにレコードがあるかを知る必要があります。初期化。調べる方法はありますか?コードは次のとおりです。

public abstract class ConsumeLoop implements Runnable {
  private final KafkaConsumer<K, V> consumer;
  private final List<String> topics;
  private final CountDownLatch shutdownLatch;

  public BasicConsumeLoop(KafkaConsumer<K, V> consumer, List<String> topics) {
    this.consumer = consumer;
    this.topics = topics;
    this.shutdownLatch = new CountDownLatch(1);
  }

  public abstract void process(ConsumerRecord<K, V> record);

  public void run() {
  try {
      consumer.subscribe(topics);  --> Consuming list of topics

      while (true) {
        ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);  --> Which topic is returning the records?
        records.forEach(record -> process(record));    
     }  
   } catch (WakeupException e) {
     // ignore, we're closing
    } catch (Exception e) {
      log.error("Unexpected error", e);
    } finally {
       consumer.close();
      shutdownLatch.countDown();
    }
  } 
}
マザネイチャ

からのpartitions()メソッドはConsumerRecords、次のセットを返しTopicPartitionます。

パーティション-このレコードセットに含まれるレコードを持つパーティションを取得します。

次に、そのセットを繰り返し処理して、必要に応じてtopic()名前とpartition()番号を取得できます。例えば:

   for (TopicPartition tp : records.partitions()) {
     System.out.println("Got " + records.records(tp).size() + " records "
       + "from topic:partition " + tp.topic() + ":" + tp.partition());
   }

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

特定のトピックからKafka消費者の最後のn個のメッセージを取得する

分類Dev

Kafkaの消費者がトピックを動的にピックアップ

分類Dev

Kafka:共通の消費者グループを使用して複数のトピックにアクセスする

分類Dev

Kafkaトピックからの異なる消費者による同じメッセージの消費を許可する

分類Dev

Apacheのカフカは、特定のトピックに関する消費者のリストを取得します

分類Dev

Python kafka:新しいメッセージが投稿されるまでkafkaトピックで消費者をブロックする方法はありますか?

分類Dev

消費者が実行するときにConfluent.Kafkaのトピックにメッセージを使用する

分類Dev

消費者が火花であるカフカトピックのラグを測定する方法は?

分類Dev

Kafka:消費者がトピックからの読み取りを終了する前に接続が中断されるとどうなりますか?

分類Dev

kafka、消費者グループ、およびトピックを理解する

分類Dev

Kafkaの消費者が特定のメッセージをコミットする方法が必要です

分類Dev

同じトピックから消費する複数の消費者

分類Dev

Kafka:同じグループIDで1つのトピックをサブスクライブする多くの消費者がいる場合、どの消費者がメッセージを受け取りますか?

分類Dev

Kafka消費者のオフセットを決定する方法

分類Dev

Kafka1つのトピックを並行して消費する方法

分類Dev

複数の消費者がパルサートピックで同じメッセージを消費することは可能ですか?

分類Dev

kafkaトピックからの各メッセージを別々に消費することは可能ですか?

分類Dev

Kafka消費者:Go Saramaで特定のオフセットからプログラムで消費する方法

分類Dev

Kafkaの消費者から特定の指標にアクセスする方法

分類Dev

消費者グループが複数のトピック パーティションにサブスクライブしている場合、kafka はどのようにして最初に読み取るかを決定しますか?

分類Dev

Kafkaの消費者は、次にポーリングするトピックからどのように選択しますか?

分類Dev

Bash:bashスクリプトの実行中に最大のメモリ/ CPU消費量を記録する方法は?

分類Dev

JMXでKafka消費者メトリクスを監視する

分類Dev

RabbitMQ-トピック交換と競合する消費者

分類Dev

kafkaトピックを消費し、httpを介して解析/提供する方法は?

分類Dev

Kafkaトピックの消費者数の監視

分類Dev

Apache Kafka 0.9 JavaAPIトピックの最初からすべてのメッセージを消費する

分類Dev

kafkaトピックからのすべてのメッセージを消費し、その後ポーリングを停止する方法はありますか?

分類Dev

C#Kafkaトピックに関するメッセージを消費できませんか?

Related 関連記事

  1. 1

    特定のトピックからKafka消費者の最後のn個のメッセージを取得する

  2. 2

    Kafkaの消費者がトピックを動的にピックアップ

  3. 3

    Kafka:共通の消費者グループを使用して複数のトピックにアクセスする

  4. 4

    Kafkaトピックからの異なる消費者による同じメッセージの消費を許可する

  5. 5

    Apacheのカフカは、特定のトピックに関する消費者のリストを取得します

  6. 6

    Python kafka:新しいメッセージが投稿されるまでkafkaトピックで消費者をブロックする方法はありますか?

  7. 7

    消費者が実行するときにConfluent.Kafkaのトピックにメッセージを使用する

  8. 8

    消費者が火花であるカフカトピックのラグを測定する方法は?

  9. 9

    Kafka:消費者がトピックからの読み取りを終了する前に接続が中断されるとどうなりますか?

  10. 10

    kafka、消費者グループ、およびトピックを理解する

  11. 11

    Kafkaの消費者が特定のメッセージをコミットする方法が必要です

  12. 12

    同じトピックから消費する複数の消費者

  13. 13

    Kafka:同じグループIDで1つのトピックをサブスクライブする多くの消費者がいる場合、どの消費者がメッセージを受け取りますか?

  14. 14

    Kafka消費者のオフセットを決定する方法

  15. 15

    Kafka1つのトピックを並行して消費する方法

  16. 16

    複数の消費者がパルサートピックで同じメッセージを消費することは可能ですか?

  17. 17

    kafkaトピックからの各メッセージを別々に消費することは可能ですか?

  18. 18

    Kafka消費者:Go Saramaで特定のオフセットからプログラムで消費する方法

  19. 19

    Kafkaの消費者から特定の指標にアクセスする方法

  20. 20

    消費者グループが複数のトピック パーティションにサブスクライブしている場合、kafka はどのようにして最初に読み取るかを決定しますか?

  21. 21

    Kafkaの消費者は、次にポーリングするトピックからどのように選択しますか?

  22. 22

    Bash:bashスクリプトの実行中に最大のメモリ/ CPU消費量を記録する方法は?

  23. 23

    JMXでKafka消費者メトリクスを監視する

  24. 24

    RabbitMQ-トピック交換と競合する消費者

  25. 25

    kafkaトピックを消費し、httpを介して解析/提供する方法は?

  26. 26

    Kafkaトピックの消費者数の監視

  27. 27

    Apache Kafka 0.9 JavaAPIトピックの最初からすべてのメッセージを消費する

  28. 28

    kafkaトピックからのすべてのメッセージを消費し、その後ポーリングを停止する方法はありますか?

  29. 29

    C#Kafkaトピックに関するメッセージを消費できませんか?

ホットタグ

アーカイブ