Spring Kafka:事务性生产者错误处理

安库什

我有一个带有读过程写模式的Kafka spring应用程序。我想确保如果有任何生产者错误,Kafka事务会回滚,以便使用SeekToCurrentErrorHandler重新使用记录。默认行为似乎是记录生产者错误并继续处理/提交。为了覆盖此默认行为,我实现了一个ProducerListener,该监听器在onError方法中引发异常。这是确保回滚和弹簧背后意图的推荐方法,为我们提供监听器挂钩吗?

日志显示一个异常,然后是一个提交(异常未导致回滚)

2020-04-02 18:20:18.314|[ org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]|                                o.s.kafka.core.KafkaTemplate|TRACE|                                 456|              d3410ae8-c964-41e7-98be-6706a6f2b3b2| Sending: ProducerRecord
2020-04-02 18:20:18.345|[                      kafka-producer-network-thread | producer-13]|                           org.apache.kafka.clients.Metadata|ERROR|                                    |                                                  | [Producer clientId=producer-13, transactionalId=tx-0] Topic authorization failed for topics 
2020-04-02 18:20:18.354|[ org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]|                             o.s.k.s.LoggingProducerListener|ERROR|                                 456|              d3410ae8-c964-41e7-98be-6706a6f2b3b2| Exception thrown when sending a message with key='170854907' org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [loyalty-retail-outlet-trans-resp-dev1]
2020-04-02 18:20:18.367|[ org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]|      o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer|INFO |                                    |                                                  | Sending offsets to transaction: {loyalty-retail-outlet-earn-req-dev-5=OffsetAndMetadata{offset=2220, leaderEpoch=null, metadata=''}}
2020-04-02 18:20:18.368|[ org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]|                         o.s.k.c.DefaultKafkaProducerFactory|TRACE|                                    |                                                  | CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@5abb103c, txId=tx-0] sendOffsetsToTransaction({loyalty-retail-outlet-earn-req-dev-5=OffsetAndMetadata{offset=2220, leaderEpoch=null, metadata=''}}, earn-unit-test)
2020-04-02 18:20:18.769|[ org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]|                             o.s.k.t.KafkaTransactionManager|DEBUG|                                    |                                                  | Initiating transaction commit
2020-04-02 18:20:18.769|[ org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]|                         o.s.k.c.DefaultKafkaProducerFactory|DEBUG|                                    |                                                  | CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@5abb103c, txId=tx-0] commitTransaction()
2020-04-02 18:20:18.816|[ org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]|                         o.s.k.c.DefaultKafkaProducerFactory|TRACE|                                    |                                                  | CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@5abb103c, txId=tx-0] close(PT5S)

记录是使用Kafka模板(读取过程-写入模式)在Kafka侦听器中生成的。

Kafka模板配置

    @Bean
    public KafkaTemplate<Integer, TransactionResponse> kafkaTemplate(
            ProducerFactory<Integer, TransactionResponse> producerFactory
            , ProducerListener<Integer, TransactionResponse> producerListener) {
        KafkaTemplate<Integer, TransactionResponse> kafkaTemplate = new KafkaTemplate<>(producerFactory);
//        kafkaTemplate.setProducerListener(producerListener);
        return kafkaTemplate;
    }

application.properties

spring:
  kafka:
    producer:
      transaction-id-prefix: tx-
      acks: all
      key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      properties:
        enable.idempotence: true
        delivery.timeout.ms: 180000

听众

   @KafkaListener(topics = "${earn.request.topic}", clientIdPrefix = "EarnConsumer", containerFactory = "earnListenerContainerFactory")
    public void listen(List<TransactionRequest> requestList,
                       @Header(KafkaHeaders.GROUP_ID) String groupId,
                       @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partitions,
                       @Header(KafkaHeaders.OFFSET) String offsets,
                       @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

发送响应方法(在侦听器代码中执行)

    public void sendResponse(TransactionResponse transactionResponse) {
        kafkaTemplate.send(earnResponseTopic, transactionResponse.getEventSummary().getMemberId(), transactionResponse);
    }

容器配置

   @Bean
    public ConcurrentKafkaListenerContainerFactory<Integer, EarnTransactionRequest> earnListenerContainerFactory(
            ConsumerFactory<Integer, EarnTransactionRequest> consumerFactory
            , SeekToCurrentBatchErrorHandler seekToCurrentBatchErrorHandler
            , KafkaTransactionManager ktm
    ) {
        ConcurrentKafkaListenerContainerFactory<Integer, EarnTransactionRequest> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();
        containerFactory.setConsumerFactory(consumerFactory);
        containerFactory.setBatchListener(true);
        containerFactory.setBatchErrorHandler(seekToCurrentBatchErrorHandler);
        containerFactory.setConcurrency(numConcurrentConsumers);

        containerFactory.getContainerProperties().setTransactionManager(ktm);
        containerFactory.getContainerProperties().setAckOnError(false);
        containerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
        containerFactory.getContainerProperties().setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
        containerFactory.getContainerProperties().setLogContainerConfig(true);
        containerFactory.getContainerProperties().setMissingTopicsFatal(true);

        return containerFactory;
    }

编辑:简化的应用程序

@Component公共类QuickTest {

private final String responseTopic;
private final KafkaTemplate<Integer, TransactionResponse> kafkaTemplate;

public QuickTest(@Value("${response.topic}") String responseTopic
        , KafkaTemplate<Integer, TransactionResponse> kafkaTemplate) {
    this.responseTopic = responseTopic;
    this.kafkaTemplate = kafkaTemplate;
}

@KafkaListener(topics = "${request.topic}", clientIdPrefix = "Consumer")
public void listen(TransactionRequest requestList) {
    kafkaTemplate.send(responseTopic, 123456789, null);
}

}

从一个事务开始到另一个日志


2020-04-03 19:04:54.901|[ org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]|      o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer|TRACE|Processing ConsumerRecord(topic = req-dev, partition = 1, leaderEpoch = 4, offset = 2185, CreateTime = 1585642853682, serialized key size = -1, serialized value size = 184, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {})

2020-04-03 19:04:54.901|[ org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]|                             o.s.k.t.KafkaTransactionManager|DEBUG|Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT

2020-04-03 19:04:54.901|[ org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]|                         o.s.k.c.DefaultKafkaProducerFactory|DEBUG|CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@43926a5b, txId=transactionx-g21.req-dev.1] beginTransaction()

2020-04-03 19:04:54.901|[ org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]|                             o.s.k.t.KafkaTransactionManager|DEBUG|Created Kafka transaction on producer [CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@43926a5b, txId=transactionx-g21.req-dev.1]]

2020-04-03 19:04:54.902|[ org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]|             o.s.k.l.a.RecordMessagingMessageListenerAdapter|DEBUG|Processing [GenericMessage [payload={"eventSummary": {"eventId": "102"}}]]

2020-04-03 19:04:54.902|[ org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]|                                o.s.kafka.core.KafkaTemplate|TRACE|Sending: ProducerRecord(topic= resp-test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=123456789, value=null, timestamp=null)

2020-04-03 19:04:54.902|[ org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]|                         o.s.k.c.DefaultKafkaProducerFactory|TRACE|CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@43926a5b, txId=transactionx-g21.req-dev.1] send(ProducerRecord(topic= resp-test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=123456789, value=null, timestamp=null))

2020-04-03 19:04:54.928|[                       kafka-producer-network-thread | producer-8]|                        o.apache.kafka.clients.NetworkClient|WARN |[Producer clientId=producer-8, transactionalId=transactionx-g21.req-dev.1] Error while fetching metadata with correlation id 22 : { resp-test=TOPIC_AUTHORIZATION_FAILED}

2020-04-03 19:04:54.928|[                       kafka-producer-network-thread | producer-8]|                           org.apache.kafka.clients.Metadata|ERROR|[Producer clientId=producer-8, transactionalId=transactionx-g21.req-dev.1] Topic authorization failed for topics [ resp-test]

2020-04-03 19:04:54.928|[ org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]|                             o.s.k.s.LoggingProducerListener|ERROR|Exception thrown when sending a message with key='123456789' and payload='null' to topic  resp-test:

org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [ resp-test]
2020-04-03 19:04:54.928|[ org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]|                                o.s.kafka.core.KafkaTemplate|DEBUG|Failed to send: ProducerRecord(topic= resp-test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=123456789, value=null, timestamp=null)

org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [ resp-test]
2020-04-03 19:04:54.928|[ org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]|                                o.s.kafka.core.KafkaTemplate|TRACE|Sent: ProducerRecord(topic= resp-test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=123456789, value=null, timestamp=null)

2020-04-03 19:04:54.928|[ org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]|      o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer|TRACE|Ack: ConsumerRecord(topic = req-dev, partition = 1, leaderEpoch = 4, offset = 2185, CreateTime = 1585642853682, serialized key size = -1, serialized value size = 184, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"eventSummary": {"eventId": "102"}})

2020-04-03 19:04:54.928|[ org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]|      o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer|DEBUG|Sending offsets to transaction: {req-dev-1=OffsetAndMetadata{offset=2186, leaderEpoch=null, metadata=''}}

2020-04-03 19:04:54.928|[ org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]|                         o.s.k.c.DefaultKafkaProducerFactory|TRACE|CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@43926a5b, txId=transactionx-g21.req-dev.1] sendOffsetsToTransaction({req-dev-1=OffsetAndMetadata{offset=2186, leaderEpoch=null, metadata=''}}, g21)

2020-04-03 19:04:55.043|[ org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]|                             o.s.k.t.KafkaTransactionManager|DEBUG|Initiating transaction commit

2020-04-03 19:04:55.043|[ org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]|                         o.s.k.c.DefaultKafkaProducerFactory|DEBUG|CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@43926a5b, txId=transactionx-g21.req-dev.1] commitTransaction()

2020-04-03 19:04:55.090|[ org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]|                         o.s.k.c.DefaultKafkaProducerFactory|TRACE|CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@43926a5b, txId=transactionx-g21.req-dev.1] close(PT5S)

2020-04-03 19:04:55.091|[ org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]|      o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer|TRACE|Processing ConsumerRecord(topic = req-dev, partition = 1, leaderEpoch = 4, offset = 2186, CreateTime = 1585644055280, serialized key size = -1, serialized value size = 184, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"eventSummary": {"eventId": "104"})

2020-04-03 19:04:55.091|[ org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]|                             o.s.k.t.KafkaTransactionManager|DEBUG|Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT

2020-04-03 19:04:55.091|[ org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]|                         o.s.k.c.DefaultKafkaProducerFactory|DEBUG|CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@43926a5b, txId=transactionx-g21.req-dev.1] beginTransaction()
加里·罗素

错误处理程序在事务内运行。您应该将其保留为空,AfterRolllbackProcessor将重新搜索记录。请参阅参考手册中的“交易”一章。

容器需要一个KafkaTransactionManager

请参阅事务回滚后处理器

您无需在.NET中做任何事情ProducerListener

编辑

我添加了授权配置以获取TopicAuthorizationException和一切都按我预期的方式工作(提交失败)...

@KafkaListener(id = "ktest24", topics = "ktest24")
public void listen(String in) {
    System.out.println("1:" + in);
    this.template.send("ktest24-out", "sendInTx");
}
1:foo
2020-04-03 14:10:26.619 ERROR 75695 --- [est24.ktest24.0] o.s.k.support.LoggingProducerListener   
 : Exception thrown when sending a message with key='null' and payload='sendInTx' to topic ktest24-out:

org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [ktest24-out]

2020-04-03 14:10:26.619 ERROR 75695 --- [  ktest24-0-C-1] essageListenerContainer$ListenerConsumer
 : Send offsets to transaction failed

org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [ktest24-out]

2020-04-03 14:10:26.620 ERROR 75695 --- [  ktest24-0-C-1] o.s.k.core.DefaultKafkaProducerFactory  
 : commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@84412c5, txId=tx-ktest24.ktest24.0]
2020-04-03 14:10:31.627 ERROR 75695 --- [  ktest24-0-C-1] essageListenerContainer$ListenerConsumer
 : Transaction rolled back
1:foo
...

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

即使未调用生产者提交,Kafka事务性生产者也会写消息

来自分类Dev

kafka幂等和事务性生产者设置之间的区别?

来自分类Dev

Kafka 事务性生产者 — read_committed 显示尽管中止的记录

来自分类Dev

Spring Kafka:C ++生产者对ReplyingKafkaTemplate的答复

来自分类Dev

Spring Kafka/Spring Cloud Stream 如何保证涉及数据库和 Kafka 的事务性/原子性?

来自分类Dev

消息生产者的 Spring Cloud 合约

来自分类Dev

Java Spring Kafka模板生产者在代理重新启动时丢失了消息

来自分类Dev

spring-cloud-stream-kafka错误处理

来自分类Dev

从调用非事务性方法的@Transactional方法在Spring 4.3

来自分类Dev

Spring Data JPA + Hibernate标记方法为事务性

来自分类Dev

Spring Redis错误处理

来自分类Dev

Spring JMS生产者和消费者交互

来自分类Dev

Spring Cloud Stream Kafka Stream 与本机 Kafka Stream 应用程序和生产者之间的 Avro 消息不兼容

来自分类Dev

Spring Cloud Stream 3.0存在生产者问题

来自分类Dev

Kafka生产者批处理超时

来自分类Dev

Spring Kafka-使用哪个批处理错误处理程序?

来自分类Dev

Java Kafka生产者错误

来自分类Dev

Spring MVC的Freemarker模板错误处理

来自分类Dev

Spring Integration DSL错误处理

来自分类Dev

Spring Integration TCP工厂错误处理

来自分类Dev

Spring-Boot-错误处理

来自分类Dev

Spring Integration TCP工厂错误处理

来自分类Dev

带有Kafka Streams Binders的Spring Cloud Stream:如何为Stream Processor设置`trusted.packages`(不同于消费者和生产者)

来自分类Dev

带有消费者/生产者API的Kafka的Spring Cloud Stream恰好在带有transaction-id-prefix的语义无法按预期工作时

来自分类Dev

Kubernetes/Spring Cloud Dataflow 流 > spring.cloud.stream.bindings.output.destination 被生产者忽略

来自分类Dev

Spring事务性注释,隔离不适用于READ_UNCOMMITTED

来自分类Dev

Spring Boot R2DBC事务性:要注释的方法

来自分类Dev

如何在(多线程)事务性Spring / JPA中删除实体

来自分类Dev

使用Spring集成实现消费者和生产者系统

Related 相关文章

  1. 1

    即使未调用生产者提交,Kafka事务性生产者也会写消息

  2. 2

    kafka幂等和事务性生产者设置之间的区别?

  3. 3

    Kafka 事务性生产者 — read_committed 显示尽管中止的记录

  4. 4

    Spring Kafka:C ++生产者对ReplyingKafkaTemplate的答复

  5. 5

    Spring Kafka/Spring Cloud Stream 如何保证涉及数据库和 Kafka 的事务性/原子性?

  6. 6

    消息生产者的 Spring Cloud 合约

  7. 7

    Java Spring Kafka模板生产者在代理重新启动时丢失了消息

  8. 8

    spring-cloud-stream-kafka错误处理

  9. 9

    从调用非事务性方法的@Transactional方法在Spring 4.3

  10. 10

    Spring Data JPA + Hibernate标记方法为事务性

  11. 11

    Spring Redis错误处理

  12. 12

    Spring JMS生产者和消费者交互

  13. 13

    Spring Cloud Stream Kafka Stream 与本机 Kafka Stream 应用程序和生产者之间的 Avro 消息不兼容

  14. 14

    Spring Cloud Stream 3.0存在生产者问题

  15. 15

    Kafka生产者批处理超时

  16. 16

    Spring Kafka-使用哪个批处理错误处理程序?

  17. 17

    Java Kafka生产者错误

  18. 18

    Spring MVC的Freemarker模板错误处理

  19. 19

    Spring Integration DSL错误处理

  20. 20

    Spring Integration TCP工厂错误处理

  21. 21

    Spring-Boot-错误处理

  22. 22

    Spring Integration TCP工厂错误处理

  23. 23

    带有Kafka Streams Binders的Spring Cloud Stream:如何为Stream Processor设置`trusted.packages`(不同于消费者和生产者)

  24. 24

    带有消费者/生产者API的Kafka的Spring Cloud Stream恰好在带有transaction-id-prefix的语义无法按预期工作时

  25. 25

    Kubernetes/Spring Cloud Dataflow 流 > spring.cloud.stream.bindings.output.destination 被生产者忽略

  26. 26

    Spring事务性注释,隔离不适用于READ_UNCOMMITTED

  27. 27

    Spring Boot R2DBC事务性:要注释的方法

  28. 28

    如何在(多线程)事务性Spring / JPA中删除实体

  29. 29

    使用Spring集成实现消费者和生产者系统

热门标签

归档