有没有办法确定 Kafka 生产者为一组特定的消息创建的批次数?例如,如果我循环发送 10K 条消息,有没有办法检查发送了多少批次?我将“batch.size”设置为一个较高的值,我的期望是消息将被缓冲,并且在我的消费者中看到消息时会有延迟。然而,这似乎几乎立即在我的消费者程序中打印出来。
如果batch.size 的默认值是16384。这是字节数吗?
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class KafkaProducerApp {
public static void main(String[] args){
Properties properties = new Properties();
properties.put("bootstrap.servers","localhost:9092,localhost:9093,localhost:9094");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("acks","0");
properties.put("batch.size",33554432);
KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);
Map<Integer,Integer> partitionCount = new HashMap<Integer,Integer>();
partitionCount.put(0,0);
partitionCount.put(1,0);
partitionCount.put(2,0);
try{
Date from = new Date();
for(int i=0;i<10000;i++) {
RecordMetadata ack = kafkaProducer.send(new ProducerRecord<String, String>("test_topic", Integer.toString(i), "MyMessage" + Integer.toString(i))).get();
//RecordMetadata ack = kafkaProducer.send(new ProducerRecord<String,String>("test_topic",0,Integer.toString(i), "MyMessage" + Integer.toString(i))).get();
System.out.println(" Offset = " + ack.offset());
System.out.println(" Partition = " + ack.partition());
partitionCount.put(ack.partition(),partitionCount.get(ack.partition())+1);
}
Date to = new Date();
System.out.println(" partition 0 =" + partitionCount.get(0));
System.out.println(" partition 1 =" + partitionCount.get(1));
System.out.println(" partition 2 =" + partitionCount.get(2));
System.out.println(" Elapsed Time = " + (to.getTime()-from.getTime())/1000);
} catch (Exception ex){
ex.printStackTrace();
} finally {
kafkaProducer.close();
}
}
}
您要求的是生产请求的总数。
您可以使用 JMX Mbean kafka.producer:type=producer-metrics,client-id=([-.w]+) 查看每秒生产请求的平均数量
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句