我编写了这个简单的程序来测试 Kafka 中的新事务生产者:
package test;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
class kafkatest {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "hello-world-producer");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer(props);
producer.initTransactions();
producer.beginTransaction();
producer.send(new ProducerRecord<>("topic", "hello", "world"));
producer.flush();
producer.abortTransaction();
producer.close();
}
}
但是当我使用 with 时isolation.level=read_committed
,该记录会出现:
--- ~ » kafka-console-consumer --bootstrap-server localhost:9092 \
--topic topic \
--from-beginning \
--consumer-property isolation.level=read_committed
world
我错过了什么?
要read_committed
与控制台使用者一起使用,您需要指定--isolation-level
选项:
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic topic --from-beginning --isolation-level=read_committed
否则,此选项默认为read_uncommitted
并覆盖您通过 传递的值--consumer-property
。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句