Spring boot Kafka consumer

suman j

Using spring integration Kafka dsl, I wonder why listener not receive messages? But the same application If I replace spring integration DSL with a method annotated with KafkaListener is able to consume messages fine. What am I missing with DSL?

DSL code that does not consume:

@Configuration
@EnableKafka
class KafkaConfig {
    //consumer factory provided by Spring boot
    @Bean
    IntegrationFlow inboundKafkaEventFlow(ConsumerFactory consumerFactory) {
        IntegrationFlows
                .from(Kafka
                .messageDrivenChannelAdapter(consumerFactory, "kafkaTopic")
                .configureListenerContainer({ c -> c.groupId('kafka-consumer-staging') })
                .id("kafkaTopicListener").autoStartup(true)
        )

                .channel("logChannel")
                .get()
    }
}

logChannel (or any other channel I use), does not reflect inbound messages.

Instead of the above code, If I use plain listener, it works fine to consume messages.

@Component
class KafkaConsumer {
    @KafkaListener(topics = ['kafkaTopic'], groupId = 'kafka-consumer-staging')
    void inboundKafkaEvent(String message) {
        log.debug("message is {}", message)
    }
}

Both approaches uses same application.properties for Kafka consumer.

Artem Bilan

You are missing the fact that you use Spring Integration, but you haven't enable it in your application. You don't need to do that for Kafka though, since you are not going to consume it with the @KafkaListener. So, to enable Spring Integration infrastructure, you need to add @EnableIntegration on your @Configuration class: https://docs.spring.io/spring-integration/docs/5.1.6.RELEASE/reference/html/#configuration-enable-integration

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

Spring Boot Kafka consumer lags and reads wrong

分類Dev

When to use ExponentialBackOffPolicy vs FixedBackOffPolicy when setting retry policy for a kafka consumer in a Spring boot app?

分類Dev

spring boot distributed processing with kafka

分類Dev

Spring kafka Consume multiple Message types in one consumer

分類Dev

Spring kafka Consume multiple Message types in one consumer

分類Dev

Spring-Kafka consumer doesn't receive messages

分類Dev

Simple embedded Kafka test example with spring boot

分類Dev

Kafka Consumer with JAVA

分類Dev

Spring Kafkaを使用したSpring Boot Rest API

分類Dev

Kafka consumer manual commit offset

分類Dev

Kafka Stream: Consumer commit frequency

分類Dev

Partition specific flink kafka consumer

分類Dev

Kafka elixir consumer keeps crashing

分類Dev

kafka consumer code is not running completely

分類Dev

Kafka consumer hangs on poll when kafka is down

分類Dev

Spring-Boot logging to Kafka: how to eliminate warning; best practices

分類Dev

Spring Boot、Spring-Kafka、およびSpring-Cloudの互換性

分類Dev

Kafka consumer - what's the relation of consumer processes and threads with topic partitions

分類Dev

Kafka consumer group script to see all consumer group not working

分類Dev

Kafka Avro Consumer with Decoderの問題

分類Dev

Better way of error handling in Kafka Consumer

分類Dev

Kafka Consumer get assigned partitions for a specific topic

分類Dev

How do I implement Kafka Consumer in Scala

分類Dev

Kafka consumer does not start from latest message

分類Dev

Incorrect Kafka offset across consumer groups

分類Dev

Connecting Apache Consumer to a single node on a Kafka Cluster

分類Dev

Issue in implementing infinite retry in kafka consumer

分類Dev

Kafka - Assign messages to specific Consumer Groups

分類Dev

Apache Kafka 2.0 get the consumer lag

Related 関連記事

  1. 1

    Spring Boot Kafka consumer lags and reads wrong

  2. 2

    When to use ExponentialBackOffPolicy vs FixedBackOffPolicy when setting retry policy for a kafka consumer in a Spring boot app?

  3. 3

    spring boot distributed processing with kafka

  4. 4

    Spring kafka Consume multiple Message types in one consumer

  5. 5

    Spring kafka Consume multiple Message types in one consumer

  6. 6

    Spring-Kafka consumer doesn't receive messages

  7. 7

    Simple embedded Kafka test example with spring boot

  8. 8

    Kafka Consumer with JAVA

  9. 9

    Spring Kafkaを使用したSpring Boot Rest API

  10. 10

    Kafka consumer manual commit offset

  11. 11

    Kafka Stream: Consumer commit frequency

  12. 12

    Partition specific flink kafka consumer

  13. 13

    Kafka elixir consumer keeps crashing

  14. 14

    kafka consumer code is not running completely

  15. 15

    Kafka consumer hangs on poll when kafka is down

  16. 16

    Spring-Boot logging to Kafka: how to eliminate warning; best practices

  17. 17

    Spring Boot、Spring-Kafka、およびSpring-Cloudの互換性

  18. 18

    Kafka consumer - what's the relation of consumer processes and threads with topic partitions

  19. 19

    Kafka consumer group script to see all consumer group not working

  20. 20

    Kafka Avro Consumer with Decoderの問題

  21. 21

    Better way of error handling in Kafka Consumer

  22. 22

    Kafka Consumer get assigned partitions for a specific topic

  23. 23

    How do I implement Kafka Consumer in Scala

  24. 24

    Kafka consumer does not start from latest message

  25. 25

    Incorrect Kafka offset across consumer groups

  26. 26

    Connecting Apache Consumer to a single node on a Kafka Cluster

  27. 27

    Issue in implementing infinite retry in kafka consumer

  28. 28

    Kafka - Assign messages to specific Consumer Groups

  29. 29

    Apache Kafka 2.0 get the consumer lag

ホットタグ

アーカイブ