Kafka Consumer with JAVA

ram

I have a single Kafka-Broker with multiple topics each having a single partition.

I have a consumer that works just fine consuming the messages from the topic

My problem is I need to improve the through put of the message queue by increasing the number of partitions, say I have four partitions on a topic, is there a way that i can write four consumers each pointed to individual partition on the topic???

import java.util.*;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class KafkaConsumer {
   private ConsumerConnector consumerConnector = null;
   private final String topic = "mytopic";

   public void initialize() {
         Properties props = new Properties();
         props.put("zookeeper.connect", "localhost:2181");
         props.put("group.id", "testgroup");
         props.put("zookeeper.session.timeout.ms", "400");
         props.put("zookeeper.sync.time.ms", "300");
         props.put("auto.commit.interval.ms", "1000");
         ConsumerConfig conConfig = new ConsumerConfig(props);
         consumerConnector = Consumer.createJavaConsumerConnector(conConfig);
   }

   public void consume() {
         //Key = topic name, Value = No. of threads for topic
         Map<String, Integer> topicCount = new HashMap<String, Integer>();       
         topicCount.put(topic, new Integer(1));

         //ConsumerConnector creates the message stream for each topic
         Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams =
               consumerConnector.createMessageStreams(topicCount);         

         // Get Kafka stream for topic 'mytopic'
         List<KafkaStream<byte[], byte[]>> kStreamList =
                                              consumerStreams.get(topic);
         // Iterate stream using ConsumerIterator
         for (final KafkaStream<byte[], byte[]> kStreams : kStreamList) {
                ConsumerIterator<byte[], byte[]> consumerIte = kStreams.iterator();

                while (consumerIte.hasNext())
                       System.out.println("Message consumed from topic
                                     [" + topic + "] : "       +
                                       new String(consumerIte.next().message()));              
         }
         //Shutdown the consumer connector
         if (consumerConnector != null)   consumerConnector.shutdown();          
   }

   public static void main(String[] args) throws InterruptedException {
         KafkaConsumer kafkaConsumer = new KafkaConsumer();
         // Configure Kafka consumer
         kafkaConsumer.initialize();
         // Start consumption
         kafkaConsumer.consume();
   }

}

Lev Stefanovich

Essentially, all you need to do is start several consumers that are all in the same consumer group. If you are using the new consumer from kafka 0.9 or later, or if you are using the high-level consumer, kafka will take care of dividing up the partitions making sure each partition is read by one consumer. If you have more partitions than consumers, then some consumers will receive messages from multiple partitions, but no partition will ever be read by more than one consumer from the same consumer group to ensure messages are not duplicated. So you never want more consumers than partitions, since some consumers will be idle. You can also fine tune which consumer reads each partition using the simple consumer https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

It seems you are using the old consumer from Kafka 0.8 or before. You may want to consider switching to the new consumer. http://kafka.apache.org/documentation.html#intro_consumers

Here is another good article with detailed examples of writing consumers using the new consumer: http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

Kafka Avro Consumer with Decoderの問題

分類Dev

Better way of error handling in Kafka Consumer

分類Dev

Kafka Consumer get assigned partitions for a specific topic

分類Dev

Kafka consumer manual commit offset

分類Dev

Retrieve last n messages of Kafka consumer from a particular topic

分類Dev

SpringBoot Kafka Consumer: Failed to start bean internalKafkaListenerEndpointRegistry TimeoutException

分類Dev

How to get the processing kafka topic name dynamically in Flink Kafka Consumer?

分類Dev

Kafka consumer hangs on poll when kafka is down

分類Dev

Getting Error "java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign" when tring to consume using flink's Kafka Consumer

分類Dev

Kafka consumer - what's the relation of consumer processes and threads with topic partitions

分類Dev

How do I implement Kafka Consumer in Scala

分類Dev

How to fix "java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord" in Spark Streaming Kafka Consumer?

分類Dev

Spark Streaming Kafka Consumerで「java.io.NotSerializableException:org.apache.kafka.clients.consumer.ConsumerRecord」を修正するにはどうすればよいですか?

分類Dev

Kafka consumer does not start from latest message

分類Dev

How to detect if kafka broker is not available from consumer in java?

分類Dev

Kafka Stream: Consumer commit frequency

分類Dev

Partition specific flink kafka consumer

分類Dev

Kafka + Kubernetes + Helm + `/ usr / bin / kafka-avro-console-consumer`?

分類Dev

Incorrect Kafka offset across consumer groups

分類Dev

Spring boot Kafka consumer

分類Dev

Connecting Apache Consumer to a single node on a Kafka Cluster

分類Dev

Kafka consumer group script to see all consumer group not working

分類Dev

Spring Boot Kafka consumer lags and reads wrong

分類Dev

Issue in implementing infinite retry in kafka consumer

分類Dev

Kafka elixir consumer keeps crashing

分類Dev

Kafka - Assign messages to specific Consumer Groups

分類Dev

Java Producer Consumer IllegalMonitorStateException

分類Dev

kafka consumer code is not running completely

分類Dev

Apache Kafka 2.0 get the consumer lag

Related 関連記事

  1. 1

    Kafka Avro Consumer with Decoderの問題

  2. 2

    Better way of error handling in Kafka Consumer

  3. 3

    Kafka Consumer get assigned partitions for a specific topic

  4. 4

    Kafka consumer manual commit offset

  5. 5

    Retrieve last n messages of Kafka consumer from a particular topic

  6. 6

    SpringBoot Kafka Consumer: Failed to start bean internalKafkaListenerEndpointRegistry TimeoutException

  7. 7

    How to get the processing kafka topic name dynamically in Flink Kafka Consumer?

  8. 8

    Kafka consumer hangs on poll when kafka is down

  9. 9

    Getting Error "java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign" when tring to consume using flink's Kafka Consumer

  10. 10

    Kafka consumer - what's the relation of consumer processes and threads with topic partitions

  11. 11

    How do I implement Kafka Consumer in Scala

  12. 12

    How to fix "java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord" in Spark Streaming Kafka Consumer?

  13. 13

    Spark Streaming Kafka Consumerで「java.io.NotSerializableException:org.apache.kafka.clients.consumer.ConsumerRecord」を修正するにはどうすればよいですか?

  14. 14

    Kafka consumer does not start from latest message

  15. 15

    How to detect if kafka broker is not available from consumer in java?

  16. 16

    Kafka Stream: Consumer commit frequency

  17. 17

    Partition specific flink kafka consumer

  18. 18

    Kafka + Kubernetes + Helm + `/ usr / bin / kafka-avro-console-consumer`?

  19. 19

    Incorrect Kafka offset across consumer groups

  20. 20

    Spring boot Kafka consumer

  21. 21

    Connecting Apache Consumer to a single node on a Kafka Cluster

  22. 22

    Kafka consumer group script to see all consumer group not working

  23. 23

    Spring Boot Kafka consumer lags and reads wrong

  24. 24

    Issue in implementing infinite retry in kafka consumer

  25. 25

    Kafka elixir consumer keeps crashing

  26. 26

    Kafka - Assign messages to specific Consumer Groups

  27. 27

    Java Producer Consumer IllegalMonitorStateException

  28. 28

    kafka consumer code is not running completely

  29. 29

    Apache Kafka 2.0 get the consumer lag

ホットタグ

アーカイブ