Better way of error handling in Kafka Consumer

user2594 :

I have a Springboot app configured with spring-kafka where I want to handle all sorts of error that can happen while listening to a topic. If any message is missed / not able to be consumed because of either Deserialization or any other Exception, there will be 2 retries and after which the message should be logged to an error file. I have two approaches that can be followed :-

First Approach( Using SeekToCurrentErrorHandler with DeadLetterPublishingRecoverer):-

@Autowired
KafkaTemplate<String,Object> template;

@Bean(name = "kafkaSourceProvider")
public ConcurrentKafkaListenerContainerFactory<K, V> consumerFactory() {
        Map<String, Object> config = appProperties.getSource()
                .getProperties();
        ConcurrentKafkaListenerContainerFactory<K, V> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));

        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
                (r, e) -> {
                    if (e instanceof FooException) {
                        return new TopicPartition(r.topic() + ".DLT", r.partition());
                    }
                });
        ErrorHandler errorHandler = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 2L));

        factory.setErrorHandler(errorHandler);
        return factory;
    }

But for this we require addition topic(a new .DLT topic) and then we can log it to a file.

@Bean
    public KafkaAdmin admin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
                StringUtils.arrayToCommaDelimitedString(kafkaEmbedded().getBrokerAddresses()));
        return new KafkaAdmin(configs);
    }
    
@KafkaListener( topics = MY_TOPIC + ".DLT", groupId = MY_ID)
public void listenDlt(ConsumerRecord<String, SomeClassName> consumerRecord,
    @Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String exceptionStackTrace) {

    logger.error(exceptionStackTrace);
}

Approach 2 ( Using custom SeekToCurrentErrorHandler) :-

@Bean
    public ConcurrentKafkaListenerContainerFactory<K, V> consumerFactory() {
        Map<String, Object> config = appProperties.getSource()
                .getProperties();
        ConcurrentKafkaListenerContainerFactory<K, V> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));
        
        factory.setErrorHandler(new CustomSeekToCurrentErrorHandler());
        factory.setRetryTemplate(retryTemplate());
        return factory;
    }

private RetryTemplate retryTemplate() {
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setBackOffPolicy(backOffPolicy());
    retryTemplate.setRetryPolicy(aSimpleReturnPolicy);
}

public class CustomSeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {

private static final int MAX_RETRY_ATTEMPTS = 2;

CustomSeekToCurrentErrorHandler() {
    super(MAX_RETRY_ATTEMPTS);
}

@Override
public void handle(Exception exception, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
    try {
        if (!records.isEmpty()) {
            log.warn("Exception: {} occurred with message: {}", exception, exception.getMessage());
            
            super.handle(exception, records, consumer, container);
        }
    } catch (SerializationException e) {
        log.warn("Exception: {} occurred with message: {}", e, e.getMessage());
    }
}

}

Can anyone provide their suggestions on what's the standard way to implement this kind of feature. In first approach we do see an overhead of creation of .DLT topics and an additional @KafkaListener. In second approach, we can directly log our consumer record exception.

Gary Russell :

With the first approach, it is not necessary to use a DeadLetterPublishingRecoverer, you can use any ConsumerRecordRecoverer that you want; in fact the default recoverer simply logs the failed message.

/**
 * Construct an instance with the default recoverer which simply logs the record after
 * the backOff returns STOP for a topic/partition/offset.
 * @param backOff the {@link BackOff}.
 * @since 2.3
 */
public SeekToCurrentErrorHandler(BackOff backOff) {
    this(null, backOff);
}

And, in the FailedRecordTracker...

if (recoverer == null) {
    this.recoverer = (rec, thr) -> {
        
        ...

        logger.error(thr, "Backoff "
            + (failedRecord == null
                ? "none"
                : failedRecord.getBackOffExecution())
            + " exhausted for " + ListenerUtils.recordToString(rec));
    };
}

Backoff (and a limit to retries) was added to the error handler after adding retry in the listener adapter, so it's "newer" (and preferred).

Also, using in-memory retry can cause issues with rebalancing if long BackOffs are employed.

Finally, only the SeekToCurrentErrorHandler can deal with deserialization problems (via the ErrorHandlingDeserializer).

EDIT

Use the ErrorHandlingDeserializer together with a SeekToCurrentErrorHandler. Deserialization exceptions are considered fatal and the recoverer is called immediately.

See the documentation.

Here is a simple Spring Boot application that demonstrates it:

public class So63236346Application {


    private static final Logger log = LoggerFactory.getLogger(So63236346Application.class);


    public static void main(String[] args) {
        SpringApplication.run(So63236346Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so63236346").partitions(1).replicas(1).build();
    }

    @Bean
    ErrorHandler errorHandler() {
        return new SeekToCurrentErrorHandler((rec, ex) -> log.error(ListenerUtils.recordToString(rec, true) + "\n"
                + ex.getMessage()));
    }

    @KafkaListener(id = "so63236346", topics = "so63236346")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("so63236346", "{\"field\":\"value1\"}");
            template.send("so63236346", "junk");
            template.send("so63236346", "{\"field\":\"value2\"}");
        };
    }

}
package com.example.demo;

public class Thing {

    private String field;

    public Thing() {
    }

    public Thing(String field) {
        this.field = field;
    }

    public String getField() {
        return this.field;
    }

    public void setField(String field) {
        this.field = field;
    }

    @Override
    public String toString() {
        return "Thing [field=" + this.field + "]";
    }

}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.demo.Thing

Result

Thing [field=value1]
2020-08-10 14:30:14.780 ERROR 78857 --- [o63236346-0-C-1] com.example.demo.So63236346Application   : so63236346-0@7
Listener failed; nested exception is org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[106, 117, 110, 107]] from topic [so63236346]
2020-08-10 14:30:14.782  INFO 78857 --- [o63236346-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-so63236346-1, groupId=so63236346] Seeking to offset 8 for partition so63236346-0
Thing [field=value2]

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

Kafka Consumer with JAVA

分類Dev

Spring boot Kafka consumer

分類Dev

Getting Error "java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign" when tring to consume using flink's Kafka Consumer

分類Dev

Kafka Consumer not able to read all message after offset commit (error=OFFSET_OUT_OF_RANGE

分類Dev

Kafka consumer manual commit offset

分類Dev

Kafka Stream: Consumer commit frequency

分類Dev

Partition specific flink kafka consumer

分類Dev

Kafka elixir consumer keeps crashing

分類Dev

kafka consumer code is not running completely

分類Dev

Kafka consumer hangs on poll when kafka is down

分類Dev

Exception handling with Consumer functions in Java 8

分類Dev

Kafka consumer - what's the relation of consumer processes and threads with topic partitions

分類Dev

Kafka consumer group script to see all consumer group not working

分類Dev

Is there a better way of comparing dates

分類Dev

A better way to detect the cursor?

分類Dev

Better way to return boolean

分類Dev

A better way to introspect a capture

分類Dev

Is there a better way to do this? BASH

分類Dev

better way to do this in MATLAB?

分類Dev

Is there a better way to loop code?

分類Dev

Kafka Avro Consumer with Decoderの問題

分類Dev

Kafka Consumer get assigned partitions for a specific topic

分類Dev

How do I implement Kafka Consumer in Scala

分類Dev

Kafka consumer does not start from latest message

分類Dev

Incorrect Kafka offset across consumer groups

分類Dev

Connecting Apache Consumer to a single node on a Kafka Cluster

分類Dev

Spring Boot Kafka consumer lags and reads wrong

分類Dev

Issue in implementing infinite retry in kafka consumer

分類Dev

Kafka - Assign messages to specific Consumer Groups

Related 関連記事

  1. 1

    Kafka Consumer with JAVA

  2. 2

    Spring boot Kafka consumer

  3. 3

    Getting Error "java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign" when tring to consume using flink's Kafka Consumer

  4. 4

    Kafka Consumer not able to read all message after offset commit (error=OFFSET_OUT_OF_RANGE

  5. 5

    Kafka consumer manual commit offset

  6. 6

    Kafka Stream: Consumer commit frequency

  7. 7

    Partition specific flink kafka consumer

  8. 8

    Kafka elixir consumer keeps crashing

  9. 9

    kafka consumer code is not running completely

  10. 10

    Kafka consumer hangs on poll when kafka is down

  11. 11

    Exception handling with Consumer functions in Java 8

  12. 12

    Kafka consumer - what's the relation of consumer processes and threads with topic partitions

  13. 13

    Kafka consumer group script to see all consumer group not working

  14. 14

    Is there a better way of comparing dates

  15. 15

    A better way to detect the cursor?

  16. 16

    Better way to return boolean

  17. 17

    A better way to introspect a capture

  18. 18

    Is there a better way to do this? BASH

  19. 19

    better way to do this in MATLAB?

  20. 20

    Is there a better way to loop code?

  21. 21

    Kafka Avro Consumer with Decoderの問題

  22. 22

    Kafka Consumer get assigned partitions for a specific topic

  23. 23

    How do I implement Kafka Consumer in Scala

  24. 24

    Kafka consumer does not start from latest message

  25. 25

    Incorrect Kafka offset across consumer groups

  26. 26

    Connecting Apache Consumer to a single node on a Kafka Cluster

  27. 27

    Spring Boot Kafka consumer lags and reads wrong

  28. 28

    Issue in implementing infinite retry in kafka consumer

  29. 29

    Kafka - Assign messages to specific Consumer Groups

ホットタグ

アーカイブ