Kafka - 生产者批次数

赌徒维姬

有没有办法确定 Kafka 生产者为一组特定的消息创建的批次数?例如,如果我循环发送 10K 条消息,有没有办法检查发送了多少批次?我将“batch.size”设置为一个较高的值,我的期望是消息将被缓冲,并且在我的消费者中看到消息时会有延迟。然而,这似乎几乎立即在我的消费者程序中打印出来。

如果batch.size 的默认值是16384。这是字节数吗?

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;


public class KafkaProducerApp {

    public static void main(String[] args){
        Properties properties = new Properties();
        properties.put("bootstrap.servers","localhost:9092,localhost:9093,localhost:9094");
        properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("acks","0");
        properties.put("batch.size",33554432);

        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);
        Map<Integer,Integer> partitionCount = new HashMap<Integer,Integer>();
        partitionCount.put(0,0);
        partitionCount.put(1,0);
        partitionCount.put(2,0);


        try{
            Date from = new Date();
            for(int i=0;i<10000;i++) {
                RecordMetadata ack = kafkaProducer.send(new ProducerRecord<String, String>("test_topic", Integer.toString(i), "MyMessage" + Integer.toString(i))).get();
                //RecordMetadata ack = kafkaProducer.send(new ProducerRecord<String,String>("test_topic",0,Integer.toString(i), "MyMessage" + Integer.toString(i))).get();
                System.out.println(" Offset = " + ack.offset());
                System.out.println(" Partition = " + ack.partition());
                partitionCount.put(ack.partition(),partitionCount.get(ack.partition())+1);

            }
            Date to = new Date();
            System.out.println(" partition 0 =" + partitionCount.get(0));
            System.out.println(" partition 1 =" + partitionCount.get(1));
            System.out.println(" partition 2 =" + partitionCount.get(2));
            System.out.println(" Elapsed Time = " + (to.getTime()-from.getTime())/1000);

        } catch (Exception ex){
            ex.printStackTrace();
        } finally {
            kafkaProducer.close();
        }



    }

}
汉斯·杰斯佩森

您要求的是生产请求的总数。

您可以使用 JMX Mbean kafka.producer:type=producer-metrics,client-id=([-.w]+) 查看每秒生产请求的平均数量

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

Java Kafka生产者错误

来自分类Dev

生产者中的Kafka数据丢失

来自分类Dev

Kafka Cluster-生产者

来自分类Dev

Kafka生产者批处理超时

来自分类Dev

无法使用 dockerized 生产者生产到 Kafka

来自分类Dev

Apache Kafka 消费者-生产者混淆

来自分类Dev

Kafka 2.3.0 生产者和消费者

来自分类Dev

即使未调用生产者提交,Kafka事务性生产者也会写消息

来自分类常见问题

无法使用Springboot构建Kafka生产者

来自分类Dev

kafka生产者单元测试(java)

来自分类Dev

要在kafka中创建多少生产者?

来自分类Dev

Kafka生产者在网络分区期间的行为如何?

来自分类Dev

编译Kafka的自定义生产者

来自分类Dev

如何在Eclipse中运行kafka生产者?

来自分类Dev

Apache Kafka生产者经纪人连接

来自分类Dev

Kafka生产者创建主题,但无法发送消息

来自分类Dev

JSON列作为kafka生产者中的键

来自分类Dev

Spring Kafka:C ++生产者对ReplyingKafkaTemplate的答复

来自分类Dev

Kafka生产者将消息发布到辅助集群

来自分类Dev

同步Kafka生产者是否有任何例外

来自分类Dev

Kafka生产者在网络分区期间的行为如何?

来自分类Dev

可能的生产者将跟踪数据导入kafka

来自分类Dev

如何在Eclipse中运行kafka生产者?

来自分类Dev

Kafka生产者配置可立即发送消息

来自分类Dev

Kafka生产者创建主题,但无法发送消息

来自分类Dev

无法通过 Kafka 生产者读取文件

来自分类Dev

多线程 Kafka 生产者如何工作?

来自分类Dev

在 kafka 中的生产者之前开始流

来自分类Dev

Kafka生产者batch.size不生效