C# Unable to consume message on Kafka topic?

FBryant87

I've been looking through several examples of the Confluent.Kafka client (https://github.com/confluentinc/confluent-kafka-dotnet/), and whilst I can successfully get a producer to push a message into Kafka, I'm unable to pull any messages back down with consumers.

Through the UI I can see the topic is created and messages are going into this topic (there are currently 10 partitions and 3 messages), but my consumer always reports "end of partition", without any consumption of any message (the 3 remain on the topic and "OnMessage" never fires).

However the consumer is definitely accessing the topics, and can see 3 messages on one of the partitions:

end of partition: dotnet-test-topic [6] @3

It just doesn't consume the message and trigger OnMessage(). Any ideas?

var conf = new Dictionary<string, object> 
{ 
    { "group.id", Guid.NewGuid().ToString() },
    { "bootstrap.servers", "mykafkacluster:9094" },
    { "sasl.mechanisms", "SCRAM-SHA-256" },
    { "security.protocol", "SASL_SSL" },
    { "sasl.username", "myuser" },
    { "sasl.password", "mypass" }
};

using (var producer = new Producer<string, string>(conf, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8)))
{
    producer.ProduceAsync("dotnet-test-topic", "some key", "some value")
            .ContinueWith(result => 
            {
                var msg = result.Result;
                if (msg.Error.Code != ErrorCode.NoError)
                {
                    Console.WriteLine($"failed to deliver message: {msg.Error.Reason}");
                }
                else 
                {
                    Console.WriteLine($"delivered to: {result.Result.TopicPartitionOffset}");
                }
            });

    producer.Flush(TimeSpan.FromSeconds(10));
}

using (var consumer = new Consumer<string, string>(conf, new StringDeserializer(Encoding.UTF8), new StringDeserializer(Encoding.UTF8)))
{ 
    consumer.Subscribe("dotnet-test-topic");

    consumer.OnConsumeError += (_, err)
        => Console.WriteLine($"consume error: {err.Error.Reason}");

    consumer.OnMessage += (_, msg)
        => Console.WriteLine($"consumed: {msg.Value}");

    consumer.OnPartitionEOF += (_, tpo)
        => Console.WriteLine($"end of partition: {tpo}");

    while (true)
    {
        consumer.Poll(TimeSpan.FromMilliseconds(100));
    }  
}
FBryant87

Looks like the OnMessage event won't fire without the following config provided:

{ "auto.offset.reset", "smallest" }

With this added, I was able to read the messages on the topic.

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

Confluent Platform unable to properly send/consume message into the Kafka topic

分類Dev

Spring kafka Consume multiple Message types in one consumer

分類Dev

Spring kafka Consume multiple Message types in one consumer

分類Dev

Message pre-processing (topic - topic) - Kafka Connect API vs. Streams vs Kafka Consumer?

分類Dev

Is there a way to always retain last message in the topic in Kafka server?

分類Dev

Java/Scala Kafka Producer does not send message to topic

分類Dev

Cloudflow is unable to read avro message from kafka

分類Dev

Create multiple partitions in kafka topic and publish message to all of them using kafka-node

分類Dev

ActiveMQ Message sending on a topic

分類Dev

Kafka topic partitions to Spark streaming

分類Dev

Cannot reassign kafka topic partition

分類Dev

Kafka Streams - missing source topic

分類Dev

Kafka MirrorMaker - Deletion of a topic is not replicated

分類Dev

Is it possible to specify a kafka topic in a Kafka Connect config?

分類Dev

Unable to consume WCF service (hosted in IIS )

分類Dev

Kafka Consumer get assigned partitions for a specific topic

分類Dev

Kafka topic creation best-practice

分類Dev

Kafka topic data is not getting deleted in windows

分類Dev

How to see a topic creation and alteration timestamp in Kafka

分類Dev

Custom compaction for Kafka topic on the broker side?

分類Dev

How to count the frequency of each ID in kafka Topic

分類Dev

How to get the processing kafka topic name dynamically in Flink Kafka Consumer?

分類Dev

Sending message with Spring Integration to ActiveMQ Artemis topic

分類Dev

How to send a firebase message to topic from Android

分類Dev

Kafka: SASL_SSL + ACL can produce but not consume

分類Dev

Does a Message-Only Window consume fewer resources?

分類Dev

Kafka produce multiple value message

分類Dev

How to consume an awaitable in C++/Cli

分類Dev

Consume wcf service using HttpWebRequest c#

Related 関連記事

  1. 1

    Confluent Platform unable to properly send/consume message into the Kafka topic

  2. 2

    Spring kafka Consume multiple Message types in one consumer

  3. 3

    Spring kafka Consume multiple Message types in one consumer

  4. 4

    Message pre-processing (topic - topic) - Kafka Connect API vs. Streams vs Kafka Consumer?

  5. 5

    Is there a way to always retain last message in the topic in Kafka server?

  6. 6

    Java/Scala Kafka Producer does not send message to topic

  7. 7

    Cloudflow is unable to read avro message from kafka

  8. 8

    Create multiple partitions in kafka topic and publish message to all of them using kafka-node

  9. 9

    ActiveMQ Message sending on a topic

  10. 10

    Kafka topic partitions to Spark streaming

  11. 11

    Cannot reassign kafka topic partition

  12. 12

    Kafka Streams - missing source topic

  13. 13

    Kafka MirrorMaker - Deletion of a topic is not replicated

  14. 14

    Is it possible to specify a kafka topic in a Kafka Connect config?

  15. 15

    Unable to consume WCF service (hosted in IIS )

  16. 16

    Kafka Consumer get assigned partitions for a specific topic

  17. 17

    Kafka topic creation best-practice

  18. 18

    Kafka topic data is not getting deleted in windows

  19. 19

    How to see a topic creation and alteration timestamp in Kafka

  20. 20

    Custom compaction for Kafka topic on the broker side?

  21. 21

    How to count the frequency of each ID in kafka Topic

  22. 22

    How to get the processing kafka topic name dynamically in Flink Kafka Consumer?

  23. 23

    Sending message with Spring Integration to ActiveMQ Artemis topic

  24. 24

    How to send a firebase message to topic from Android

  25. 25

    Kafka: SASL_SSL + ACL can produce but not consume

  26. 26

    Does a Message-Only Window consume fewer resources?

  27. 27

    Kafka produce multiple value message

  28. 28

    How to consume an awaitable in C++/Cli

  29. 29

    Consume wcf service using HttpWebRequest c#

ホットタグ

アーカイブ