Kafka Consumer with JAVA

ram

I have a single Kafka-Broker with multiple topics each having a single partition.

I have a consumer that works just fine consuming the messages from the topic

My problem is I need to improve the through put of the message queue by increasing the number of partitions, say I have four partitions on a topic, is there a way that i can write four consumers each pointed to individual partition on the topic???

import java.util.*;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class KafkaConsumer {
   private ConsumerConnector consumerConnector = null;
   private final String topic = "mytopic";

   public void initialize() {
         Properties props = new Properties();
         props.put("zookeeper.connect", "localhost:2181");
         props.put("group.id", "testgroup");
         props.put("zookeeper.session.timeout.ms", "400");
         props.put("zookeeper.sync.time.ms", "300");
         props.put("auto.commit.interval.ms", "1000");
         ConsumerConfig conConfig = new ConsumerConfig(props);
         consumerConnector = Consumer.createJavaConsumerConnector(conConfig);
   }

   public void consume() {
         //Key = topic name, Value = No. of threads for topic
         Map<String, Integer> topicCount = new HashMap<String, Integer>();       
         topicCount.put(topic, new Integer(1));

         //ConsumerConnector creates the message stream for each topic
         Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams =
               consumerConnector.createMessageStreams(topicCount);         

         // Get Kafka stream for topic 'mytopic'
         List<KafkaStream<byte[], byte[]>> kStreamList =
                                              consumerStreams.get(topic);
         // Iterate stream using ConsumerIterator
         for (final KafkaStream<byte[], byte[]> kStreams : kStreamList) {
                ConsumerIterator<byte[], byte[]> consumerIte = kStreams.iterator();

                while (consumerIte.hasNext())
                       System.out.println("Message consumed from topic
                                     [" + topic + "] : "       +
                                       new String(consumerIte.next().message()));              
         }
         //Shutdown the consumer connector
         if (consumerConnector != null)   consumerConnector.shutdown();          
   }

   public static void main(String[] args) throws InterruptedException {
         KafkaConsumer kafkaConsumer = new KafkaConsumer();
         // Configure Kafka consumer
         kafkaConsumer.initialize();
         // Start consumption
         kafkaConsumer.consume();
   }

}

Lev Stefanovich

Essentially, all you need to do is start several consumers that are all in the same consumer group. If you are using the new consumer from kafka 0.9 or later, or if you are using the high-level consumer, kafka will take care of dividing up the partitions making sure each partition is read by one consumer. If you have more partitions than consumers, then some consumers will receive messages from multiple partitions, but no partition will ever be read by more than one consumer from the same consumer group to ensure messages are not duplicated. So you never want more consumers than partitions, since some consumers will be idle. You can also fine tune which consumer reads each partition using the simple consumer https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

It seems you are using the old consumer from Kafka 0.8 or before. You may want to consider switching to the new consumer. http://kafka.apache.org/documentation.html#intro_consumers

Here is another good article with detailed examples of writing consumers using the new consumer: http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

How to detect if kafka broker is not available from consumer in java?

分類Dev

How to fix "java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord" in Spark Streaming Kafka Consumer?

分類Dev

Spring boot Kafka consumer

分類Dev

Kafka consumer manual commit offset

分類Dev

Kafka Stream: Consumer commit frequency

分類Dev

Partition specific flink kafka consumer

分類Dev

Kafka elixir consumer keeps crashing

分類Dev

kafka consumer code is not running completely

分類Dev

Getting Error "java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign" when tring to consume using flink's Kafka Consumer

分類Dev

Kafka consumer hangs on poll when kafka is down

分類Dev

Kafka consumer - what's the relation of consumer processes and threads with topic partitions

分類Dev

Kafka consumer group script to see all consumer group not working

分類Dev

Kafka Avro Consumer with Decoderの問題

分類Dev

Better way of error handling in Kafka Consumer

分類Dev

Kafka Consumer get assigned partitions for a specific topic

分類Dev

How do I implement Kafka Consumer in Scala

分類Dev

Kafka consumer does not start from latest message

分類Dev

Incorrect Kafka offset across consumer groups

分類Dev

Connecting Apache Consumer to a single node on a Kafka Cluster

分類Dev

Spring Boot Kafka consumer lags and reads wrong

分類Dev

Issue in implementing infinite retry in kafka consumer

分類Dev

Kafka - Assign messages to specific Consumer Groups

分類Dev

Apache Kafka 2.0 get the consumer lag

分類Dev

Java Producer Consumer IllegalMonitorStateException

分類Dev

Spark Streaming Kafka Consumerで「java.io.NotSerializableException:org.apache.kafka.clients.consumer.ConsumerRecord」を修正するにはどうすればよいですか?

分類Dev

How to get the processing kafka topic name dynamically in Flink Kafka Consumer?

分類Dev

Kafka + Kubernetes + Helm + `/ usr / bin / kafka-avro-console-consumer`?

分類Dev

Retrieve last n messages of Kafka consumer from a particular topic

分類Dev

SpringBoot Kafka Consumer: Failed to start bean internalKafkaListenerEndpointRegistry TimeoutException

Related 関連記事

  1. 1

    How to detect if kafka broker is not available from consumer in java?

  2. 2

    How to fix "java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord" in Spark Streaming Kafka Consumer?

  3. 3

    Spring boot Kafka consumer

  4. 4

    Kafka consumer manual commit offset

  5. 5

    Kafka Stream: Consumer commit frequency

  6. 6

    Partition specific flink kafka consumer

  7. 7

    Kafka elixir consumer keeps crashing

  8. 8

    kafka consumer code is not running completely

  9. 9

    Getting Error "java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign" when tring to consume using flink's Kafka Consumer

  10. 10

    Kafka consumer hangs on poll when kafka is down

  11. 11

    Kafka consumer - what's the relation of consumer processes and threads with topic partitions

  12. 12

    Kafka consumer group script to see all consumer group not working

  13. 13

    Kafka Avro Consumer with Decoderの問題

  14. 14

    Better way of error handling in Kafka Consumer

  15. 15

    Kafka Consumer get assigned partitions for a specific topic

  16. 16

    How do I implement Kafka Consumer in Scala

  17. 17

    Kafka consumer does not start from latest message

  18. 18

    Incorrect Kafka offset across consumer groups

  19. 19

    Connecting Apache Consumer to a single node on a Kafka Cluster

  20. 20

    Spring Boot Kafka consumer lags and reads wrong

  21. 21

    Issue in implementing infinite retry in kafka consumer

  22. 22

    Kafka - Assign messages to specific Consumer Groups

  23. 23

    Apache Kafka 2.0 get the consumer lag

  24. 24

    Java Producer Consumer IllegalMonitorStateException

  25. 25

    Spark Streaming Kafka Consumerで「java.io.NotSerializableException:org.apache.kafka.clients.consumer.ConsumerRecord」を修正するにはどうすればよいですか?

  26. 26

    How to get the processing kafka topic name dynamically in Flink Kafka Consumer?

  27. 27

    Kafka + Kubernetes + Helm + `/ usr / bin / kafka-avro-console-consumer`?

  28. 28

    Retrieve last n messages of Kafka consumer from a particular topic

  29. 29

    SpringBoot Kafka Consumer: Failed to start bean internalKafkaListenerEndpointRegistry TimeoutException

ホットタグ

アーカイブ