在Kafka Listener
遭受打击之前,我遇到了很多反序列化失败的事情。我一直在研究Gary Russel建造的东西,但是在使它工作时遇到了问题。我所有的东西都是通过属性文件配置的。
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=io.confluent.kafka.serializers.KafkaAvroDeserializer
因此,如果添加这些内容,我的理解是将错误包装在消费者记录的标题中吗?我的最终目标是使任何反序列化异常都击中我拥有的某些自定义类,以便我可以处理该问题。IE,转发到我的死信处理程序,该程序将失败的数据上传到s3。
我尝试将错误处理程序标志添加到kafkalistener,但这也无济于事。
我已经更新了配置,对我来说仍然不清楚。它没有用,所以我认为没有用。
没有自定义代码被调用
spring.kafka.consumer.properties.value.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
spring.kafka.consumer.properties.key.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.function=com.thing.cyclic.service.FailedFooProvider
spring.kafka.consumer.properties.spring.deserializer.key.delegate.class=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicNameStrategy
spring.kafka.consumer.properties.specific.avro.reader=true
spring.kafka.consumer.properties.auto.register.schemas=false
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.listener.ack-mode=manual_immediate
public class BadFoo {
private final FailedDeserializationInfo failedDeserializationInfo;
public BadFoo(FailedDeserializationInfo failedDeserializationInfo) {
this.failedDeserializationInfo = failedDeserializationInfo;
}
public FailedDeserializationInfo getFailedDeserializationInfo() {
return this.failedDeserializationInfo;
}
}
public class FailedFooProvider implements Function<FailedDeserializationInfo, String> {
@Override
public String apply(FailedDeserializationInfo info) {
System.out.println("");
return "";
}
}
还要看一下DeadLetterPublishingRecoverer
代码,该代码可用于将失败的记录发布到另一个主题。之后,您可以对代码建模,以获得包含failed的头byte[]
。
恢复器与结合使用SeekToCurrentErrorHandler
。
将错误处理程序配置为a @Bean
,Spring Boot会自动将其连接到容器中。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句