kafkaバージョン:0.9.0.1
もしそうならn = 20
、私はトピックの最後の20メッセージを取得する必要があります。
私が試した
kafkaConsumer.seekToBeginning();
しかし、すべてのメッセージを取得します。最後の20件のメッセージのみを取得する必要があります。
このトピックには数十万のレコードがある場合があります
public List<JSONObject> consumeMessages(String kafkaTopicName) {
KafkaConsumer<String, String> kafkaConsumer = null;
boolean flag = true;
List<JSONObject> messagesFromKafka = new ArrayList<>();
int recordCount = 0;
int i = 0;
int maxMessagesToReturn = 20;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "project.group.id");
props.put("max.partition.fetch.bytes", "1048576000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Arrays.asList(kafkaTopicName));
TopicPartition topicPartition = new TopicPartition(kafkaTopicName, 0);
LOGGER.info("Subscribed to topic " + kafkaConsumer.listTopics());
while (flag) {
// will consume all the messages and store in records
ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
kafkaConsumer.seekToBeginning(topicPartition);
// getting total records count
recordCount = records.count();
LOGGER.info("recordCount " + recordCount);
for (ConsumerRecord<String, String> record : records) {
if(record.value() != null) {
if (i >= recordCount - maxMessagesToReturn) {
// adding last 20 messages to messagesFromKafka
LOGGER.info("kafkaMessage "+record.value());
messagesFromKafka.add(new JSONObject(record.value()));
}
i++;
}
}
if (recordCount > 0) {
flag = false;
}
}
kafkaConsumer.close();
return messagesFromKafka;
}
を使用kafkaConsumer.seekToEnd(Collection<TopicPartition> partitions)
して、特定のパーティションの最後のオフセットまでシークできます。ドキュメントに従って:
「指定された各パーティションの最後のオフセットまでシークします。この関数は遅延評価を行い、
poll(Duration)
またはposition(TopicPartition)
が呼び出されたときにのみすべてのパーティションの最終オフセットを探します。パーティションが指定されていない場合は、現在割り当てられているすべてのパーティションの最終オフセットを探します」
次に、を使用して特定のパーティションの位置を取得できますposition(TopicPartition partition)
。
次に、それから20を減らし、を使用kafkaConsumer.seek(TopicPartition partition, long offset)
して最新の20メッセージに到達できます。
単に、
kafkaConsumer.seekToEnd(partitionList);
long endPosition = kafkaConsumer.position(topicPartiton);
long recentMessagesStartPosition = endPosition - maxMessagesToReturn;
kafkaConsumer.seek(topicPartition, recentMessagesStartPosition);
これで、最新の20メッセージを取得できます poll()
これは単純なロジックですが、複数のパーティションがある場合は、それらのケースも考慮する必要があります。私はこれを試しませんでしたが、コンセプトを理解していただければ幸いです。
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加