使用 Spring Cloud Stream 和 Kafka 处理重复消息

瓦迪姆

我正在将 Spring Cloud Stream 与 Kafka 活页夹一起使用。它工作得很好,但客户端收到重复的消息。已经尝试了所有 Kafka 消费者属性,但没有结果。

在我的应用程序示例中检查 2 个类 - AggregateApplication 和 EventFilterApplication。如果我运行 EventFilterApplication - 只有 1 条消息,如果是 AggregateApplication - 2 条相同的消息。


这是我的代码如下:

1) 聚合器

import com.example.EventFilterApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.aggregate.AggregateApplicationBuilder;

@SpringBootApplication
public class AggregateApplication {
    public static void main(String[] args) {
        new AggregateApplicationBuilder(new Object[]{EventFilterApplication.class}, args)
            .from(EventFilterApplication.class)
            .run(args);
    }
}

2) 事件过滤器应用程序

@SpringBootApplication
@EnableBinding(EventFilterApplication.LiveProcessor.class)
public class EventFilterApplication {

    @Autowired
    LiveProcessor source;

    @StreamListener(LiveProcessor.INPUT)
    public void handle(byte[] event) {
        try {

            System.out.println(new Date().getTime() + ": event was processed:" + Arrays.toString(event));

        } catch (Exception e) {
            System.out.println(String.format("Error={%s} on processing message=%s", e.getMessage(), Arrays.toString(event)));
        }
    }
    public static void main(String[] args) {
        SpringApplication.run(EventFilterApplication.class, args);
    }

    interface LiveProcessor extends Source {

        String INPUT = "liveSource";

        @Input(INPUT)
        SubscribableChannel input();
    }
}

3) 应用程序.yml

spring:
cloud:
    stream:
        kafka:
          binder:
              brokers: kafka-broker.example.com:9092
              defaultBrokerPort: 9092
              defaultZkPort: 2181
              zkNodes: kafka-zookeeper.example.com
        type: kafka
        bindings:
            liveSource:
                binder: kafka
                consumer:
                    headerMode: raw
                    autoCommitOffset: true
                destination: topic_example_name

4)build.gradle

buildscript {
    ext { springBootVersion = '1.4.2.RELEASE' }
    repositories {
        jcenter()
        maven { url 'http://repo.spring.io/plugins-release' }
    }
    dependencies {
        classpath("org.springframework.build.gradle:propdeps-plugin:0.0.7")
        classpath("org.springframework.boot:spring-boot-gradle-plugin:$springBootVersion")
        classpath("io.spring.gradle:dependency-management-plugin:0.5.2.RELEASE")
    }
}

ext['logstashLogbackEncoderV'] = '4.8'
ext['springCloudV'] = 'Camden.SR1'
ext['springCloudStreamV'] = 'Brooklyn.SR2'
ext['springIntegrationKafkaV'] = '1.3.1.RELEASE'

subprojects {
    apply plugin: 'java'
    apply plugin: 'propdeps'
    apply plugin: 'propdeps-idea'
    apply plugin: "io.spring.dependency-management"

    sourceCompatibility = 1.8

    dependencyManagement {
        imports {
            mavenBom "org.springframework.cloud:spring-cloud-dependencies:Camden.SR1"
            mavenBom "org.springframework.cloud:spring-cloud-stream-dependencies:Brooklyn.SR2"
            mavenBom "org.springframework.cloud.stream.app:spring-cloud-stream-app-dependencies:1.0.4.RELEASE"
        }
    }

    dependencies {
        compile("org.springframework.boot:spring-boot-starter-web:$springBootVersion") {
            exclude module: "spring-boot-starter-tomcat"
            exclude group: 'log4j'
        }

        compile("org.springframework.cloud:spring-cloud-starter-stream-kafka")

        compile("org.springframework.integration:spring-integration-kafka:$springIntegrationKafkaV") {
            exclude group: "org.slf4j"
        }

        compile("org.springframework.cloud:spring-cloud-stream:")

        compile("org.springframework.cloud:spring-cloud-starter-sleuth")

        compile("net.logstash.logback:logstash-logback-encoder:${logstashLogbackEncoderV}")

        testCompile("org.springframework.boot:spring-boot-starter-test:$springBootVersion") {
            exclude group: "org.slf4j"
        }
    }
}
马里乌斯·博戈耶维奇

重复是由EventFilterApplication作为父根引起的

public class AggregateApplication {
    public static void main(String[] args) {
        new AggregateApplicationBuilder(new Object[]{EventFilterApplication.class}, args)
            .from(EventFilterApplication.class)
            .run(args);
    }
}

这很可能会创建两个订阅。EventFilterApplication您可以简单地执行以下操作,而不是添加为 root:

public class AggregateApplication {
    public static void main(String[] args) {
        new AggregateApplicationBuilder(args)
            .from(EventFilterApplication.class)
            // rest of the pipeline
            .run(args);
    }
}

如果您不需要创建聚合,这应该足够了:

public static void main(String[] args) {
        SpringApplication.run(EventFilterApplication.class, args);
}

编辑:添加了一个额外的例子并澄清了答案。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

Spring Cloud Stream Kafka Stream 与本机 Kafka Stream 应用程序和生产者之间的 Avro 消息不兼容

来自分类Dev

使用Spring Cloud Stream和RabbitMQ设置消息属性

来自分类Dev

使用 Spring Cloud Stream 和 RabbitMq 处理异常

来自分类Dev

了解Spring Cloud Stream Kafka和Spring重试

来自分类Dev

使用Spring Cloud Stream Kafka Binder时无法设置groupId和clientId

来自分类Dev

如何实现Kafka使用者使用spring-cloud-stream来按需处理事件?

来自分类Dev

如何使用Spring Cloud Stream Supplier将密钥消息发送到Kafka

来自分类Dev

spring-cloud-stream-kafka错误处理

来自分类Dev

发送后的Spring Cloud Stream处理消息

来自分类Dev

Spring Cloud Stream Kafka记录太大

来自分类Dev

Spring Cloud Stream Kafka 错误通道

来自分类Dev

Spring Kafka/Spring Cloud Stream 如何保证涉及数据库和 Kafka 的事务性/原子性?

来自分类Dev

spring-cloud-stream-kafka离线消费者消息丢失

来自分类Dev

如何使用Kafka Steams对Spring Cloud Stream进行单元测试

来自分类Dev

是否可以在Spring Cloud Stream Kafka Streams 3.0 Binder样式API方法上使用@KafkaStreamsStateStore批注?

来自分类Dev

使用kafka-streams活页夹测试Spring Cloud Stream应用程序

来自分类Dev

如何在 Spring Cloud Stream Kafka Streams 应用程序中使用 StateStoreBuilder 添加 StateStore

来自分类Dev

Spring Cloud Stream Kafka Streams:下游消息数与发送给该主题的消息总数不匹配

来自分类常见问题

Spring Cloud Stream在Kafka Streams流程中出现Serde错误

来自分类Dev

Spring Cloud Stream Kafka活页夹压缩

来自分类Dev

spring.cloud.stream.kafka.binder.headers无法正常工作

来自分类Dev

如何更改Spring Cloud Stream Kafka活页夹的目标?

来自分类Dev

Spring Cloud Stream-Kafka Binder集成/配置

来自分类Dev

Spring Cloud Stream 2.0 是否与 Kafka broker 0.10.2 兼容?

来自分类Dev

Spring Cloud Stream Kafka 消费者测试

来自分类Dev

Spring Cloud Stream 向 Kafka 主题发送数据失败

来自分类Dev

Spring Cloud Stream kafka 客户端异常

来自分类Dev

Spring Cloud Stream 中的批处理模式和自定义错误处理

来自分类Dev

如何拦截Spring Cloud Stream消息?

Related 相关文章

  1. 1

    Spring Cloud Stream Kafka Stream 与本机 Kafka Stream 应用程序和生产者之间的 Avro 消息不兼容

  2. 2

    使用Spring Cloud Stream和RabbitMQ设置消息属性

  3. 3

    使用 Spring Cloud Stream 和 RabbitMq 处理异常

  4. 4

    了解Spring Cloud Stream Kafka和Spring重试

  5. 5

    使用Spring Cloud Stream Kafka Binder时无法设置groupId和clientId

  6. 6

    如何实现Kafka使用者使用spring-cloud-stream来按需处理事件?

  7. 7

    如何使用Spring Cloud Stream Supplier将密钥消息发送到Kafka

  8. 8

    spring-cloud-stream-kafka错误处理

  9. 9

    发送后的Spring Cloud Stream处理消息

  10. 10

    Spring Cloud Stream Kafka记录太大

  11. 11

    Spring Cloud Stream Kafka 错误通道

  12. 12

    Spring Kafka/Spring Cloud Stream 如何保证涉及数据库和 Kafka 的事务性/原子性?

  13. 13

    spring-cloud-stream-kafka离线消费者消息丢失

  14. 14

    如何使用Kafka Steams对Spring Cloud Stream进行单元测试

  15. 15

    是否可以在Spring Cloud Stream Kafka Streams 3.0 Binder样式API方法上使用@KafkaStreamsStateStore批注?

  16. 16

    使用kafka-streams活页夹测试Spring Cloud Stream应用程序

  17. 17

    如何在 Spring Cloud Stream Kafka Streams 应用程序中使用 StateStoreBuilder 添加 StateStore

  18. 18

    Spring Cloud Stream Kafka Streams:下游消息数与发送给该主题的消息总数不匹配

  19. 19

    Spring Cloud Stream在Kafka Streams流程中出现Serde错误

  20. 20

    Spring Cloud Stream Kafka活页夹压缩

  21. 21

    spring.cloud.stream.kafka.binder.headers无法正常工作

  22. 22

    如何更改Spring Cloud Stream Kafka活页夹的目标?

  23. 23

    Spring Cloud Stream-Kafka Binder集成/配置

  24. 24

    Spring Cloud Stream 2.0 是否与 Kafka broker 0.10.2 兼容?

  25. 25

    Spring Cloud Stream Kafka 消费者测试

  26. 26

    Spring Cloud Stream 向 Kafka 主题发送数据失败

  27. 27

    Spring Cloud Stream kafka 客户端异常

  28. 28

    Spring Cloud Stream 中的批处理模式和自定义错误处理

  29. 29

    如何拦截Spring Cloud Stream消息?

热门标签

归档