特定のトピックからKafka消費者の最後のn個のメッセージを取得する

プラヴィーンクマール:

kafkaバージョン:0.9.0.1

もしそうならn = 20、私はトピックの最後の20メッセージを取得する必要があります。

私が試した

kafkaConsumer.seekToBeginning();

しかし、すべてのメッセージを取得します。最後の20件のメッセージのみを取得する必要があります。

このトピックには数十万のレコードがある場合があります

public List<JSONObject> consumeMessages(String kafkaTopicName) {
  KafkaConsumer<String, String> kafkaConsumer = null;
  boolean flag = true;
  List<JSONObject> messagesFromKafka = new ArrayList<>();
  int recordCount = 0;
  int i = 0;
  int maxMessagesToReturn = 20;

  Properties props = new Properties();         
  props.put("bootstrap.servers", "localhost:9092");
  props.put("group.id", "project.group.id");
  props.put("max.partition.fetch.bytes", "1048576000");
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  kafkaConsumer = new KafkaConsumer<>(props);

  kafkaConsumer.subscribe(Arrays.asList(kafkaTopicName));
  TopicPartition topicPartition = new TopicPartition(kafkaTopicName, 0);
  LOGGER.info("Subscribed to topic " + kafkaConsumer.listTopics());
  while (flag) {
    // will consume all the messages and store in records
    ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
    kafkaConsumer.seekToBeginning(topicPartition);

    // getting total records count
    recordCount = records.count();
    LOGGER.info("recordCount " + recordCount);
    for (ConsumerRecord<String, String> record : records) {
      if(record.value() != null) {
        if (i >= recordCount - maxMessagesToReturn) {
          // adding last 20 messages to messagesFromKafka
          LOGGER.info("kafkaMessage "+record.value());
          messagesFromKafka.add(new JSONObject(record.value()));
        }
        i++;
      }
    }
    if (recordCount > 0) {
      flag = false;
    }
  }
  kafkaConsumer.close();
  return messagesFromKafka;
}
ThisaruG:

を使用kafkaConsumer.seekToEnd(Collection<TopicPartition> partitions)して、特定のパーティションの最後のオフセットまでシークできますドキュメントに従って:

「指定された各パーティションの最後のオフセットまでシークします。この関数は遅延評価を行い、poll(Duration)またはposition(TopicPartition)が呼び出されたときにのみすべてのパーティションの最終オフセットを探します。パーティションが指定されていない場合は、現在割り当てられているすべてのパーティションの最終オフセットを探します」

次に、を使用して特定のパーティションの位置を取得できますposition(TopicPartition partition)

次に、それから20を減らし、を使用kafkaConsumer.seek(TopicPartition partition, long offset)して最新の20メッセージに到達できます。

単に、

kafkaConsumer.seekToEnd(partitionList);
long endPosition = kafkaConsumer.position(topicPartiton);
long recentMessagesStartPosition = endPosition - maxMessagesToReturn;
kafkaConsumer.seek(topicPartition, recentMessagesStartPosition);

これで、最新の20メッセージを取得できます poll()

これは単純なロジックですが、複数のパーティションがある場合は、それらのケースも考慮する必要があります。私はこれを試しませんでしたが、コンセプトを理解していただければ幸いです。

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

Kafkaトピックからの異なる消費者による同じメッセージの消費を許可する

分類Dev

Apache Kafka 0.9 JavaAPIトピックの最初からすべてのメッセージを消費する

分類Dev

kafkaトピックからの各メッセージを別々に消費することは可能ですか?

分類Dev

kafkaトピックからのすべてのメッセージを消費し、その後ポーリングを停止する方法はありますか?

分類Dev

kafkaトピックからの大きなメッセージを消費する方法

分類Dev

kafkaログ圧縮トピックからのメッセージを再度消費する

分類Dev

Kafkaトピックからのメッセージを消費する-応答なし

分類Dev

Kafkaの消費者が特定のメッセージをコミットする方法が必要です

分類Dev

SCS-kafkaで利用可能な最後のメッセージを消費する

分類Dev

複数のkafkaトピックからのメッセージを消費するためのベストプラクティスは何ですか?

分類Dev

Kafka:同じグループIDで1つのトピックをサブスクライブする多くの消費者がいる場合、どの消費者がメッセージを受け取りますか?

分類Dev

複数の消費者がパルサートピックで同じメッセージを消費することは可能ですか?

分類Dev

特定のオフセットでkafkaトピックからメッセージを取得する方法

分類Dev

Kafkaの消費者に記録があるトピックを特定する

分類Dev

WebSocketに接続するときにakka-stream-kafkaを使用してkafkaトピックから最後のメッセージを取得する

分類Dev

Sparkのkafkaメッセージからトピックを取得する

分類Dev

Kafkaトピックから最新のメッセージを取得する方法

分類Dev

サーバーからのkafkaメッセージをオフセット値で消費する方法は?

分類Dev

消費者が実行するときにConfluent.Kafkaのトピックにメッセージを使用する

分類Dev

Kafka消費者:Go Saramaで特定のオフセットからプログラムで消費する方法

分類Dev

カフカトピックから最後のメッセージを取得する方法はありますか?

分類Dev

Kafka:共通の消費者グループを使用して複数のトピックにアクセスする

分類Dev

特定のオフセットから始めて、JavaでKafkaからのメッセージを消費する方法

分類Dev

同じトピックから消費する複数の消費者

分類Dev

消費者グループの最後に消費されたオフセットを取得するにはどうすればよいですか?

分類Dev

Kafka トピックから利用可能なすべてのメッセージが消費された後、メッセージのリストを含む Future をどのように返しますか?

分類Dev

特定のユーザーから最後のN個のメッセージを削除する方法

分類Dev

コンソールコンシューマーで特定のポイントからkafkaトピックからのメッセージを消費する方法は?

分類Dev

kafkaトピックに送信された最後のメッセージを取得する

Related 関連記事

  1. 1

    Kafkaトピックからの異なる消費者による同じメッセージの消費を許可する

  2. 2

    Apache Kafka 0.9 JavaAPIトピックの最初からすべてのメッセージを消費する

  3. 3

    kafkaトピックからの各メッセージを別々に消費することは可能ですか?

  4. 4

    kafkaトピックからのすべてのメッセージを消費し、その後ポーリングを停止する方法はありますか?

  5. 5

    kafkaトピックからの大きなメッセージを消費する方法

  6. 6

    kafkaログ圧縮トピックからのメッセージを再度消費する

  7. 7

    Kafkaトピックからのメッセージを消費する-応答なし

  8. 8

    Kafkaの消費者が特定のメッセージをコミットする方法が必要です

  9. 9

    SCS-kafkaで利用可能な最後のメッセージを消費する

  10. 10

    複数のkafkaトピックからのメッセージを消費するためのベストプラクティスは何ですか?

  11. 11

    Kafka:同じグループIDで1つのトピックをサブスクライブする多くの消費者がいる場合、どの消費者がメッセージを受け取りますか?

  12. 12

    複数の消費者がパルサートピックで同じメッセージを消費することは可能ですか?

  13. 13

    特定のオフセットでkafkaトピックからメッセージを取得する方法

  14. 14

    Kafkaの消費者に記録があるトピックを特定する

  15. 15

    WebSocketに接続するときにakka-stream-kafkaを使用してkafkaトピックから最後のメッセージを取得する

  16. 16

    Sparkのkafkaメッセージからトピックを取得する

  17. 17

    Kafkaトピックから最新のメッセージを取得する方法

  18. 18

    サーバーからのkafkaメッセージをオフセット値で消費する方法は?

  19. 19

    消費者が実行するときにConfluent.Kafkaのトピックにメッセージを使用する

  20. 20

    Kafka消費者:Go Saramaで特定のオフセットからプログラムで消費する方法

  21. 21

    カフカトピックから最後のメッセージを取得する方法はありますか?

  22. 22

    Kafka:共通の消費者グループを使用して複数のトピックにアクセスする

  23. 23

    特定のオフセットから始めて、JavaでKafkaからのメッセージを消費する方法

  24. 24

    同じトピックから消費する複数の消費者

  25. 25

    消費者グループの最後に消費されたオフセットを取得するにはどうすればよいですか?

  26. 26

    Kafka トピックから利用可能なすべてのメッセージが消費された後、メッセージのリストを含む Future をどのように返しますか?

  27. 27

    特定のユーザーから最後のN個のメッセージを削除する方法

  28. 28

    コンソールコンシューマーで特定のポイントからkafkaトピックからのメッセージを消費する方法は?

  29. 29

    kafkaトピックに送信された最後のメッセージを取得する

ホットタグ

アーカイブ