使用属性文件在Spring Kafka中处理错误?

瑞安

Kafka Listener遭受打击之前,我遇到了很多反序列化失败的事情我一直在研究Gary Russel建造的东西,但是在使它工作时遇到了问题。我所有的东西都是通过属性文件配置的。

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=io.confluent.kafka.serializers.KafkaAvroDeserializer

因此,如果添加这些内容,我的理解是将错误包装在消费者记录的标题中吗?我的最终目标是使任何反序列化异常都击中我拥有的某些自定义类,以便我可以处理该问题。IE,转发到我的死信处理程序,该程序将失败的数据上传到s3。

我尝试将错误处理程序标志添加到kafkalistener,但这也无济于事。

更新的属性配置

我已经更新了配置,对我来说仍然不清楚。它没有用,所以我认为没有用。

没有自定义代码被调用

spring.kafka.consumer.properties.value.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
spring.kafka.consumer.properties.key.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.function=com.thing.cyclic.service.FailedFooProvider

spring.kafka.consumer.properties.spring.deserializer.key.delegate.class=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicNameStrategy
spring.kafka.consumer.properties.specific.avro.reader=true
spring.kafka.consumer.properties.auto.register.schemas=false
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.listener.ack-mode=manual_immediate

BadFoo

public class BadFoo {

    private final FailedDeserializationInfo failedDeserializationInfo;

    public BadFoo(FailedDeserializationInfo failedDeserializationInfo) {
        this.failedDeserializationInfo = failedDeserializationInfo;
    }

    public FailedDeserializationInfo getFailedDeserializationInfo() {
        return this.failedDeserializationInfo;
    }
}

FailedFooProvider

public class FailedFooProvider implements Function<FailedDeserializationInfo, String> {
    @Override
    public String apply(FailedDeserializationInfo info) {
        System.out.println("");
        return "";
    }
}
加里·罗素

在此处此处查看文档

还要看一下DeadLetterPublishingRecoverer代码,该代码可用于将失败的记录发布到另一个主题。之后,您可以对代码建模,以获得包含failed的头byte[]

https://github.com/spring-projects/spring-kafka/blob/fa5c35e9b15c4cecfc6ea2bbbf9e7745bc5d9f75/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java#L169-L178

恢复器与结合使用SeekToCurrentErrorHandler

将错误处理程序配置为a @Bean,Spring Boot会自动将其连接到容器中。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

使用spring在属性文件中设置值

来自分类Dev

使用Shopify Sarama的Kafka错误处理

来自分类Dev

spring 验证器不使用属性文件显示错误消息

来自分类Dev

Cloud Spring Boot如何使用yml文件中的属性

来自分类Dev

Spring Kafka-使用哪个批处理错误处理程序?

来自分类Dev

spring-cloud-stream-kafka错误处理

来自分类Dev

如何在Spring上下文文件中的属性文件中使用环境变量

来自分类Dev

我们可以在spring XML配置文件中使用apachecamel设置Kafka属性吗

来自分类Dev

如何处理spring集成中的错误

来自分类Dev

Spring XML-忽略在xml中写入的url中的$,而不使用属性文件

来自分类Dev

dpkg:存档中的文件处理错误

来自分类Dev

在php中处理文件上传的众多错误

来自分类Dev

PHP 中的错误函数与文件处理

来自分类Dev

spring-kafka KafkaListener中的并行处理和自动缩放

来自分类Dev

如何在Spring Boot应用程序中使用配置(properties / yml)文件中的属性?

来自分类Dev

在Spring Boot中如何同时使用来自不同配置文件的属性?

来自分类Dev

使用 spring 4 用传递的 VM 参数(-D 参数)替换属性文件中的占位符

来自分类Dev

如何处理批处理文件中的sqlplus错误

来自分类Dev

从Thymeleaf访问Spring中的属性文件

来自分类Dev

从Spring属性文件中读取值

来自分类Dev

从Spring MVC中的文件加载属性

来自分类Dev

在Java Spring中从文件读取属性

来自分类Dev

如何遍历Spring xml中的属性文件

来自分类Dev

属性文件中的Java Spring Maven值

来自分类Dev

在Spring Boot中访问属性文件

来自分类Dev

如何使用Spring批注将属性文件中的值注入到现有实例(不受Spring管理)的字段中?

来自分类Dev

使用多个任务处理 python 中的错误

来自分类常见问题

使用application.yml / properties的批处理侦听器的Spring Kafka集成属性

来自分类Dev

如何处理 Kafka Consumer 中的错误

Related 相关文章

  1. 1

    使用spring在属性文件中设置值

  2. 2

    使用Shopify Sarama的Kafka错误处理

  3. 3

    spring 验证器不使用属性文件显示错误消息

  4. 4

    Cloud Spring Boot如何使用yml文件中的属性

  5. 5

    Spring Kafka-使用哪个批处理错误处理程序?

  6. 6

    spring-cloud-stream-kafka错误处理

  7. 7

    如何在Spring上下文文件中的属性文件中使用环境变量

  8. 8

    我们可以在spring XML配置文件中使用apachecamel设置Kafka属性吗

  9. 9

    如何处理spring集成中的错误

  10. 10

    Spring XML-忽略在xml中写入的url中的$,而不使用属性文件

  11. 11

    dpkg:存档中的文件处理错误

  12. 12

    在php中处理文件上传的众多错误

  13. 13

    PHP 中的错误函数与文件处理

  14. 14

    spring-kafka KafkaListener中的并行处理和自动缩放

  15. 15

    如何在Spring Boot应用程序中使用配置(properties / yml)文件中的属性?

  16. 16

    在Spring Boot中如何同时使用来自不同配置文件的属性?

  17. 17

    使用 spring 4 用传递的 VM 参数(-D 参数)替换属性文件中的占位符

  18. 18

    如何处理批处理文件中的sqlplus错误

  19. 19

    从Thymeleaf访问Spring中的属性文件

  20. 20

    从Spring属性文件中读取值

  21. 21

    从Spring MVC中的文件加载属性

  22. 22

    在Java Spring中从文件读取属性

  23. 23

    如何遍历Spring xml中的属性文件

  24. 24

    属性文件中的Java Spring Maven值

  25. 25

    在Spring Boot中访问属性文件

  26. 26

    如何使用Spring批注将属性文件中的值注入到现有实例(不受Spring管理)的字段中?

  27. 27

    使用多个任务处理 python 中的错误

  28. 28

    使用application.yml / properties的批处理侦听器的Spring Kafka集成属性

  29. 29

    如何处理 Kafka Consumer 中的错误

热门标签

归档