Kafka Transaction Manager sends to Kafka Broker despite transaction rolling back

Lekkie

My Kafka Producer keeps sending to Kafka Broker despite transaction failing. I have a custom listener i.e. I am not using the @KafkaListener annotation. This is running on Spring-kafka 2.2.x

Any ideas why the message ends up in Kafka despite KafkaTransactionManager rolling back? Here is my setup below:

// Kafka producer sender
@Transactional(transactionManager = "kafkaTransactionManager", propagation = Propagation.REQUIRED)
public void sendToKafkaWithTransaction(final String topic, final Object payload){
    ProducerRecord<String, Object> record = new ProducerRecord(topic, key, payload);
    template.executeInTransaction(kt -> kt.send(record));
}

// RabbitMQ producer sender
@Transactional(transactionManager = "rabbitTransactionManager", propagation = Propagation.REQUIRED)
public void sendToRabbitmqWithTransaction(final String topic, final String header, final Object payload){
    template.convertAndSend(topic, header, payload);
}

// Chained Transaction Manager
@Bean(name = "chainedKafkaTransactionManager")
public ChainedKafkaTransactionManager<Object, Object> chainedKafkaTransactionManager(
       @Qualifier(value = "transactionalKafkaProducer") ProducerFactory<String, Object> producerFactory,
       @Qualifier(value = "transactionManager") JpaTransactionManager jpaTransactionManager,
       @Qualifier(value = "rabbitTransactionManager") RabbitTransactionManager rabbitTransactionManager) {
   KafkaTransactionManager producerKtm = new KafkaTransactionManager(producerFactory);
producerKtm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
   return new ChainedKafkaTransactionManager<>(jpaTransactionManager, producerKtm, rabbitTransactionManager);
}


// Listener config
listenerFactory.getContainerProperties().setTransactionManager(chainedKafkaTransactionManager);



// Listener
@Transactional(transactionManager = "chainedKafkaTransactionManager")
public void onMessage(final ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment, Consumer<?, ?> consumer){
    
    try {
            RetryState retryState = new DefaultRetryState(consumerRecord.topic() + "-" + consumerRecord.partition() + "-" + consumerRecord.offset());

            retryTemplate.execute(context -> {
                saveToDb() // This rolls back
                sendToKafkaWithTransaction(topic, payload); // This message gets to Kafa, it should not.
                sendToRabbitmqWithTransaction(topic, payload);  // This rolls back
                throw new Exception("Out of Anger");
                return null;
            }, recoveryCallBack, retryState);

            acknowledgment.acknowledge();
      }
      catch (ListenerExecutionFailedException e) {
         throw e;
      }
}    

// See logs
[ consumer-0-C-1] o.s.a.r.t.RabbitTransactionManager       : Participating transaction failed - marking existing transaction as rollback-only
[ consumer-0-C-1] o.s.k.t.KafkaTransactionManager          : Participating transaction failed - marking existing transaction as rollback-only
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager        : Participating transaction failed - marking existing transaction as rollback-only
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager        : Setting JPA transaction on EntityManager [SessionImpl(104745239<open>)] rollback-only

EDIT: Adding spring boot config:

spring.kafka:
  admin:
    bootstrap-servers: ${kakfa.host}
  consumer:
    group-id: test-consumers
    client-id: test-consumers
    auto-offset-reset: latest
    value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    enable-auto-commit: false
    properties:
        isolation-level: read_committed
  producer:
    client-id: test-producer
    acks: all
    retries: 3
    transaction-id-prefix: test-producer-tx-
    value-serializer: org.apache.kafka.common.serialization.StringSerializer
    properties:
      enable.idempotence: true
      transactional.id: tran-id-1-
      max.in.flight.requests.per.connection: 5
      isolation-level: read_committed

Edit More Logs

[ consumer-0-C-1] o.s.t.i.TransactionInterceptor           : Completing transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.saveAndFlush]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Removed value [org.springframework.data.jpa.repository.support.CrudMethodMetadataPostProcessor$DefaultCrudMethodMetadata@18061927] for key [public abstract java.lang.Object org.springframework.data.jpa.repository.JpaRepository.saveAndFlush(java.lang.Object)] from thread [consumer-0-C-1]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.kafka.core.KafkaResourceHolder@23b172f4] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@4df8f4e9] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.kafka.core.KafkaResourceHolder@23b172f4] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@4df8f4e9] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.kafka.core.KafkaResourceHolder@23b172f4] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@4df8f4e9] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.kafka.core.KafkaTemplate             : Sending: ProducerRecord(topic=topic-1, partition=null)
[-27cf188e6c23-1] org.apache.kafka.clients.Metadata        : Cluster ID: r3baK471R6mIft7L_DIOIg
[ consumer-0-C-1] o.s.kafka.core.KafkaTemplate             : Sent: ProducerRecord(topic=topic-1, partition=null)
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.orm.jpa.EntityManagerHolder@16bfeffa] for key [org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean@30eed725] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager        : Found thread-bound EntityManager [SessionImpl(23309560<open>)] for JPA transaction
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.jdbc.datasource.ConnectionHolder@cbfb10d] for key [HikariDataSource (HikariPool-1)] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager        : Participating in existing transaction
[ consumer-0-C-1] o.s.t.i.TransactionInterceptor           : Getting transaction for [com.arca.framework.messaging.services.impl.BoradcastMessageServiceImpl.sendTransactional]
[-27cf188e6c23-1] o.s.kafka.core.KafkaTemplate             : Sent ok: ProducerRecord(topic=topic-1, partition=null), metadata: topic-1-0@185
[ consumer-0-C-1] o.s.a.r.c.CachingConnectionFactory       : Creating cached Rabbit Channel from AMQChannel(amqp://[email protected]:5672/,4)
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Bound value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@2655e199] for key [CachingConnectionFactory [channelCacheSize=25, host=localhost, port=5672, active=true rabbitConnectionFactory]] to thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.amqp.rabbit.core.RabbitTemplate      : Executing callback RabbitTemplate$$Lambda$1237/634386320 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,4), conn: Proxy@3c964873 Shared Rabbit Connection: SimpleConnection@5d9ccad2 [delegate=amqp://[email protected]:5672/, localPort= 64338]
[ consumer-0-C-1] o.s.amqp.rabbit.core.RabbitTemplate      : Publishing message (Body:'{ }')
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@2655e199] for key [CachingConnectionFactory [channelCacheSize=25, host=localhost, port=5672, active=true rabbitConnectionFactory]] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.t.i.TransactionInterceptor           : Completing transaction for [messaging.services.impl.RabbitMessageServiceImpl.send]
[ consumer-0-C-1] c.a.f.m.k.r.KafkaSingleDispatchReceiver  : Unable to process messages of type: [class messaging.kafka.events.acquiringtmstransaction.TmsTransactionEvent] and id: [92dccb48-2cd2-47b8-b778-8550dcd72d04]
[ consumer-0-C-1] .a.f.m.k.c.KafkaTransactionalRetryPolicy : Retry count [1] for message [{}]
[ consumer-0-C-1] o.s.t.i.TransactionInterceptor           : Completing transaction for [messaging.kafka.receivers.KafkaReceiver.onMessage] after exception: exceptions.MyException: Out of anger
[ consumer-0-C-1] o.s.t.i.RuleBasedTransactionAttribute    : Applying rules to determine whether transaction should rollback on exceptions.MyException: Out of anger
[ consumer-0-C-1] o.s.t.i.RuleBasedTransactionAttribute    : Winning rollback rule is: null
[ consumer-0-C-1] o.s.t.i.RuleBasedTransactionAttribute    : No relevant rollback rule found: applying default rules
[ consumer-0-C-1] o.s.k.t.KafkaTransactionManager          : Triggering beforeCompletion synchronization
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Removed value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@2655e199] for key [CachingConnectionFactory [channelCacheSize=25, host=localhost, port=5672, active=true rabbitConnectionFactory]] from thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.k.t.KafkaTransactionManager          : Initiating transaction rollback
[ consumer-0-C-1] o.s.k.core.DefaultKafkaProducerFactory   : abortTransaction: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@b70782d, txId=tran-id-1-acquiring-tms-transaction-consumers.pos_txn_log.0]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Clearing transaction synchronization
[ consumer-0-C-1] o.s.k.t.KafkaTransactionManager          : Triggering afterCompletion synchronization
[ consumer-0-C-1] o.s.a.r.connection.RabbitResourceHolder  : Rolling back messages to channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,4), conn: Proxy@3c964873 Shared Rabbit Connection: SimpleConnection@5d9ccad2 [delegate=amqp://[email protected]:5672/, localPort= 64338]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Removed value [org.springframework.kafka.core.KafkaResourceHolder@23b172f4] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@4df8f4e9] from thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.k.t.KafkaTransactionManager          : Resuming suspended transaction after completion of inner transaction
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Initializing transaction synchronization
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager        : Participating transaction failed - marking existing transaction as rollback-only
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager        : Setting JPA transaction on EntityManager [SessionImpl(23309560<open>)] rollback-only
[ consumer-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception
Gary Russell

That's the way Kafka transactions work. Published records are always written to the log, followed by a marker record that indicates whether the transaction committed, or rolled back.

To avoid seeing the rolled-back records, you have to set the consumer isolation.level property to read_committed (it is read_uncommitted by default).

EDIT

It's because you are starting a new transaction:

template.executeInTransaction(kt -> kt.send(record));
/**
 * Execute some arbitrary operation(s) on the operations and return the result.
 * The operations are invoked within a local transaction and do not participate
 * in a global transaction (if present).
 * @param callback the callback.
 * @param <T> the result type.
 * @return the result.
 * @since 1.1
 */
@Nullable
<T> T executeInTransaction(OperationsCallback<K, V, T> callback);

Just call template.send() and the template will participate in the transaction started by the container.

You can also remove the @Transactional from that method.

EDIT2

This works as expected for me...

spring.kafka.producer.transaction-id-prefix=tx-

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.isolation-level=read-committed

logging.level.org.springframework.transaction=trace
logging.level.org.springframework.kafka.core=trace
@SpringBootApplication
@EnableTransactionManagement
public class So66306109Application {

    public static void main(String[] args) {
        SpringApplication.run(So66306109Application.class, args);
    }

    @Autowired
    Foo foo;

    @Transactional
    @KafkaListener(id = "so66306109", topics = "so66306109") // Not really needed; the container has already started it
    public void listen(String in) {
        System.out.println(in);
        this.foo.send(in.toUpperCase());
        throw new RuntimeException("test");
    }


    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so66306109").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name("so66306109-2").partitions(1).replicas(1).build();
    }


    @KafkaListener(id = "so66306109-2", topics = "so66306109-2")
    public void listen2(String in) {
        System.out.println(in);
    }

}

@Component
class Foo {

    @Autowired
    KafkaTemplate<String, String> template;

    @Transactional // Not needed - we're already in a transaction
    void send(String in) {
        this.template.send("so66306109-2", in);
    }

}

EDIT3

If you cannot upgrade to a supported version, you need to disable transactions in the container, and manage it yourself in your code, within the retry execute scope.

Here is an example.

@SpringBootApplication
@EnableTransactionManagement
public class So66306109Application {

    public static void main(String[] args) {
        SpringApplication.run(So66306109Application.class, args);
    }

    @Autowired
    Foo foo;

    @Autowired
    RetryTemplate template;

    @KafkaListener(id = "so66306109", topics = "so66306109") // Not really needed; the container has already started it
    public void listen(ConsumerRecord<String, String> in) {
        this.template.execute(context -> {
            System.out.println(in);
            this.foo.send(in);
            return null;
        }, context -> {
            System.out.println("RETRIES EXHAUSTED");
            return null;
        });
    }


    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so66306109").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name("so66306109-2").partitions(1).replicas(1).build();
    }

    @KafkaListener(id = "so66306109-2", topics = "so66306109-2")
    public void listen2(String in) {
        System.out.println(in);
    }

    @Bean
    ChainedKafkaTransactionManager<String, String> chainedTm(KafkaTransactionManager<String, String> ktm,
            ConcurrentKafkaListenerContainerFactory<?, ?> factory) {

        // transactions can't be started by the container
        factory.getContainerProperties().setTransactionManager(null);
        return new ChainedKafkaTransactionManager<>(ktm);
    }

    @Bean
    public RetryTemplate template() {
        return new RetryTemplate();
    }

}

@Component
class Foo {

    @Autowired
    KafkaTemplate<String, String> template;

    @Autowired
    ProducerFactory<String, String> pf;

    @Transactional("chainedTm")
    public void send(ConsumerRecord<String,String> in) {
        // updateDB
        this.template.send(new ProducerRecord<String, String>("so66306109-2", null, null, in.value().toUpperCase()));
        this.template.sendOffsetsToTransaction(Collections.singletonMap(new TopicPartition(in.topic(), in.partition()),
                new OffsetAndMetadata(in.offset() + 1)));

        // simulate a DB rollback
        KafkaResourceHolder<String, String> resource = (KafkaResourceHolder<String, String>) TransactionSynchronizationManager
                .getResource(this.pf);
        resource.setRollbackOnly();
    }

}

Note; you must NOT manually acknowledge such records; instead, send the offset to the transaction before it is committed.

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

From Dev

Rails transaction not rolling back

From Dev

PetaPoco transaction not rolling back

From Dev

ActiveRecord transaction is not rolling back whole transaction

From Dev

Cordova SQLite transaction not rolling back

From Dev

Rolling back transaction with Oracle OCCI

From Dev

Transaction in stored procedure not rolling back

From Dev

MySQL transaction not rolling back on error

From Dev

Prevent inner transaction from rolling back outer transaction

From Dev

Prevent inner transaction from rolling back outer transaction

From Dev

Laravel DB::transaction not rolling back on exception

From Dev

Rolling back a transaction in apache chemistry cmis

From Dev

Rolling back transaction scope C#

From Dev

Rolling back a transaction in apache chemistry cmis

From Dev

What is a good way of rolling back a transaction in Postgres

From Dev

Kafka Listener rollback transaction on session timeout

From Dev

Write to two Kafka topics in a single transaction using Spring Kafka

From Dev

How to prevent NServiceBus from rolling back the transaction or parts of it?

From Dev

SQL Server - Rolling back particular transaction only at a later date

From Dev

Exceptions when rolling back a transaction - connection already closed?

From Dev

How to prevent a specific INSERT statement from rolling back in a transaction

From Dev

TSQL: Prevent trigger suppressing error but rolling back transaction

From Dev

Rolling back Transaction Doesn't Work with TpFIB Components

From Dev

How to prevent NServiceBus from rolling back the transaction or parts of it?

From Dev

Jms-message-driven channel adaper not rolling back the transaction

From Dev

How to prevent a specific INSERT statement from rolling back in a transaction

From Dev

Why is my MSDTC transaction not correctly rolling back on my localhost environment?

From Dev

Realm write transaction failing, despite being in transaction

From Dev

Kafka Broker vs Topic

From Dev

Advantages of kafka multi broker

Related Related

  1. 1

    Rails transaction not rolling back

  2. 2

    PetaPoco transaction not rolling back

  3. 3

    ActiveRecord transaction is not rolling back whole transaction

  4. 4

    Cordova SQLite transaction not rolling back

  5. 5

    Rolling back transaction with Oracle OCCI

  6. 6

    Transaction in stored procedure not rolling back

  7. 7

    MySQL transaction not rolling back on error

  8. 8

    Prevent inner transaction from rolling back outer transaction

  9. 9

    Prevent inner transaction from rolling back outer transaction

  10. 10

    Laravel DB::transaction not rolling back on exception

  11. 11

    Rolling back a transaction in apache chemistry cmis

  12. 12

    Rolling back transaction scope C#

  13. 13

    Rolling back a transaction in apache chemistry cmis

  14. 14

    What is a good way of rolling back a transaction in Postgres

  15. 15

    Kafka Listener rollback transaction on session timeout

  16. 16

    Write to two Kafka topics in a single transaction using Spring Kafka

  17. 17

    How to prevent NServiceBus from rolling back the transaction or parts of it?

  18. 18

    SQL Server - Rolling back particular transaction only at a later date

  19. 19

    Exceptions when rolling back a transaction - connection already closed?

  20. 20

    How to prevent a specific INSERT statement from rolling back in a transaction

  21. 21

    TSQL: Prevent trigger suppressing error but rolling back transaction

  22. 22

    Rolling back Transaction Doesn't Work with TpFIB Components

  23. 23

    How to prevent NServiceBus from rolling back the transaction or parts of it?

  24. 24

    Jms-message-driven channel adaper not rolling back the transaction

  25. 25

    How to prevent a specific INSERT statement from rolling back in a transaction

  26. 26

    Why is my MSDTC transaction not correctly rolling back on my localhost environment?

  27. 27

    Realm write transaction failing, despite being in transaction

  28. 28

    Kafka Broker vs Topic

  29. 29

    Advantages of kafka multi broker

HotTag

Archive