Kafka consumer - what's the relation of consumer processes and threads with topic partitions

Asif Iqbal

I have been working with Kafka lately and have bit of confusion regarding the consumers under a consumer group. The center of the confusion is whether to implement consumers as processes or threads. For this question, assume I am using the high level consumer.

Let's consider a scenario that I have experimented with. In my topic there are 2 partitions (for simplicity let's assume replication factor is just 1). I created a consumer (ConsumerConnector) process consumer1 with group group1, then created a topic count map of size 2 and then spawned 2 consumer threads consumer1_thread1 and consumer1_thread2 under that process. It looks like consumer1_thread1 is consuming partition 0 and consumer1_thread2 is consuming partition 1. Is this behaviour always deterministic? Below is the code snippet. Class TestConsumer is my consumer thread class.

    ...
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(2));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    executor = Executors.newFixedThreadPool(2);

    int threadNumber = 0;
    for (final KafkaStream stream : streams) {
        executor.submit(new TestConsumer(stream, threadNumber));
        threadNumber++;
    }
    ...

Now, let's consider another scenario (which I haven't experimented but am curious) where I start 2 consumer processes consumer1 and consumer2 both having the same group group1 and each of them is a single threaded process. Now my questions are:

  1. How will the two independent consumer processes (under the same group nevertheless) be related to the partitions in this case ? How is it different from the above single process multi-thread scenario?

  2. In general, how are consumer threads or processes mapped / related to partitions in the topic?

  3. The Kafka documentation does say that each consumer under a consumer group will consume one partition. However, does that refer to a consumer thread (like my above code example) or independent consumer processes?

  4. Is there any subtle thing I am missing here regarding implementing consumers as processes vs threads? Thanks in advance.

user2720864

A consumer group can have multiple consumer instances running (multiple process with the same group-id). While consuming each partition is consumed by exactly one consumer instance in the group.

E.g. if your topic contains 2 partitions and you start a consumer group group-A with 2 consumer instances then each one of them will be consuming messages from a particular partition of the topic.

If you start the same 2 consumer with different group id group-A & group-B then the message from both partitions of the topic will be broadcast to each one of them. So in that case the consumer instance running under group-A will have messages from both the partitions of the topic, and same is true for group-B as well.

Read more on this on their documentation

EDIT : Based on your comment which says,

I was wondering what is the effective difference between having 2 consumer threads under the same process as opposed to 2 consumer processes (group being the same in both cases)

The consumer group-id is same/global across the cluster. Suppose you have started process-one with 2 threads and then spawn another process (may be in a different machine) with the same groupId having 2 more threads then kafka will add these 2 new threads to consume messages from the topic. So eventually there will be 4 threads responsible for consuming from the same topic. Kafka will then trigger a re-balance to re-assign partitions to threads, so it could happen that for a particular partition which was being consumed by thread T1 of process P1may be allocated to be consumed by thread T2 of process P2. The below few lines are taken from the wiki page

When a new process is started with the same Consumer Group name, Kafka will add that processes' threads to the set of threads available to consume the Topic and trigger a 're-balance'. During this re-balance Kafka will assign available partitions to available threads, possibly moving a partition to another process. If you have a mixture of old and new business logic, it is possible that some messages go to the old logic.

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

Kafka Consumer get assigned partitions for a specific topic

分類Dev

Does one consumer thread against many partitions per topic in Kafka can cause latency?

分類Dev

How to get the processing kafka topic name dynamically in Flink Kafka Consumer?

分類Dev

Retrieve last n messages of Kafka consumer from a particular topic

分類Dev

Message pre-processing (topic - topic) - Kafka Connect API vs. Streams vs Kafka Consumer?

分類Dev

What happens to existing topic's partitions when a new broker is added to the Kafka cluster?

分類Dev

Kafka Consumer with JAVA

分類Dev

Spring boot Kafka consumer

分類Dev

Slow consumer impact on a topic partition

分類Dev

Kafka what will happen with message if a consumer group member goes down?

分類Dev

Is consumer offset commited even when failing to post to output topic in Kafka Streams?

分類Dev

Kafka topic partitions to Spark streaming

分類Dev

Kafka consumer manual commit offset

分類Dev

Kafka Stream: Consumer commit frequency

分類Dev

Partition specific flink kafka consumer

分類Dev

Kafka elixir consumer keeps crashing

分類Dev

kafka consumer code is not running completely

分類Dev

Kafka consumer hangs on poll when kafka is down

分類Dev

Kafka consumer group script to see all consumer group not working

分類Dev

Producer & consumer threads using condition variables

分類Dev

Getting Error "java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign" when tring to consume using flink's Kafka Consumer

分類Dev

Kafka Avro Consumer with Decoderの問題

分類Dev

Better way of error handling in Kafka Consumer

分類Dev

How do I implement Kafka Consumer in Scala

分類Dev

Kafka consumer does not start from latest message

分類Dev

Incorrect Kafka offset across consumer groups

分類Dev

Connecting Apache Consumer to a single node on a Kafka Cluster

分類Dev

Spring Boot Kafka consumer lags and reads wrong

分類Dev

Issue in implementing infinite retry in kafka consumer

Related 関連記事

  1. 1

    Kafka Consumer get assigned partitions for a specific topic

  2. 2

    Does one consumer thread against many partitions per topic in Kafka can cause latency?

  3. 3

    How to get the processing kafka topic name dynamically in Flink Kafka Consumer?

  4. 4

    Retrieve last n messages of Kafka consumer from a particular topic

  5. 5

    Message pre-processing (topic - topic) - Kafka Connect API vs. Streams vs Kafka Consumer?

  6. 6

    What happens to existing topic's partitions when a new broker is added to the Kafka cluster?

  7. 7

    Kafka Consumer with JAVA

  8. 8

    Spring boot Kafka consumer

  9. 9

    Slow consumer impact on a topic partition

  10. 10

    Kafka what will happen with message if a consumer group member goes down?

  11. 11

    Is consumer offset commited even when failing to post to output topic in Kafka Streams?

  12. 12

    Kafka topic partitions to Spark streaming

  13. 13

    Kafka consumer manual commit offset

  14. 14

    Kafka Stream: Consumer commit frequency

  15. 15

    Partition specific flink kafka consumer

  16. 16

    Kafka elixir consumer keeps crashing

  17. 17

    kafka consumer code is not running completely

  18. 18

    Kafka consumer hangs on poll when kafka is down

  19. 19

    Kafka consumer group script to see all consumer group not working

  20. 20

    Producer & consumer threads using condition variables

  21. 21

    Getting Error "java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign" when tring to consume using flink's Kafka Consumer

  22. 22

    Kafka Avro Consumer with Decoderの問題

  23. 23

    Better way of error handling in Kafka Consumer

  24. 24

    How do I implement Kafka Consumer in Scala

  25. 25

    Kafka consumer does not start from latest message

  26. 26

    Incorrect Kafka offset across consumer groups

  27. 27

    Connecting Apache Consumer to a single node on a Kafka Cluster

  28. 28

    Spring Boot Kafka consumer lags and reads wrong

  29. 29

    Issue in implementing infinite retry in kafka consumer

ホットタグ

アーカイブ