How to enable Kafka Producer Metrics in Spark?

Martin Peng

We are using Kafka 0.10 with Spark 2.1 and I found our producer publish messages was always slow. I can only reach around 1k/s after give 8 cores to Spark executors while other post said they car reach millions/sec easily. I tried to tune the linger.ms and batch.size to find out. However I found linger.ms=0 looks like optimal for me and the batch.size doesn't take much effect. And I was sending 160k events per iteration. Looks like I have to enable the Kafka Producer Metrics to know what exactly happen. But looks like it is not very easy to enable it in Spark Executor.

Could any one share me some lights?

My codes are like this:

private def publishMessagesAttempt(producer: KafkaProducer[String, String], topic: String, messages: Iterable[(String, String)], producerMaxDelay: Long,
                                 individualMessageMaxDelay: Long, logger: (String, Boolean) => Unit = KafkaClusterUtils.DEFAULT_LOGGER): Iterable[(String, String)] = {
val futureMessages = messages.map(message => (message, producer.send(new ProducerRecord[String, String](topic, message._1, message._2))))
val messageSentTime = System.currentTimeMillis
val awaitedResults = futureMessages.map { case (message, future) =>
  val waitFor = Math.max(producerMaxDelay - (System.currentTimeMillis - messageSentTime), individualMessageMaxDelay)
  val failed = Try(future.get(waitFor, TimeUnit.MILLISECONDS)) match {
    case Success(_) => false
    case Failure(f) =>
      logger(s"Error happened when publish to Kafka: ${f.getStackTraceString}", true)
      true
  }
  (message, failed)
}
awaitedResults.filter(_._2).map(_._1)
}
Martin Peng

I finally find the answer. 1. KafkaProducer has a metrics() function which can get the metrics of the producer. Just simply print it should be enough.

Some codes like this should work:

public class MetricsProducerReporter implements Runnable {
private final Producer<String, StockPrice> producer;
private final Logger logger =
        LoggerFactory.getLogger(MetricsProducerReporter.class);

//Used to Filter just the metrics we want
private final Set<String> metricsNameFilter = Sets.set(
        "record-queue-time-avg", "record-send-rate", "records-per-request-avg",
        "request-size-max", "network-io-rate", "record-queue-time-avg",
        "incoming-byte-rate", "batch-size-avg", "response-rate", "requests-in-flight"
);

public MetricsProducerReporter(
        final Producer<String, StockPrice> producer) {
    this.producer = producer;
}

@Override
public void run() {
    while (true) {
        final Map<MetricName, ? extends Metric> metrics
                = producer.metrics();

        displayMetrics(metrics);
        try {
            Thread.sleep(3_000);
        } catch (InterruptedException e) {
            logger.warn("metrics interrupted");
            Thread.interrupted();
            break;
        }
    }
}
  1. My codes are slow was because the scala map doesn't have the parallel enabled by default. I will have to use messages.par.map() to achieve the parallelism.

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

How to enable Dropwizard Metrics in Play 2.5 HikariCP?

分類Dev

How to Enable KubeAPI server for HPA Autoscaling Metrics

分類Dev

How to Improve Performance of Kafka Producer when used in Synchronous Mode

分類Dev

Improving performance of Kafka Producer

分類Dev

Kafka Authentication Producer Unable to Connect Producer

分類Dev

Kafka producer sending invalid characters

分類Dev

kafka: keep track of producer offset

分類Dev

Kafka Producer design - multiple topics

分類Dev

Invalid Timestamp when writing a Kafka producer with sarama

分類Dev

How do you link both spark and kafka in sbt?

分類Dev

How to read from specific Kafka partition in Spark structured streaming

分類Dev

How do I reduce a spark dataframe from kafka and collect the result?

分類Dev

How to distribute data evenly in Kafka producing messages through Spark?

分類Dev

How to partition Spark dataset using a field from Kafka input

分類Dev

How to read stream nested JSON from kafka in Spark using Java

分類Dev

How to get at least N number of logs from Kafka through Spark?

分類Dev

ExceptionInInitializerError Spark Streaming Kafka

分類Dev

Spark Kafka Streaming Issue

分類Dev

Kafka Producer Callbackパフォーマンス

分類Dev

Kafka producer and consumer on separate computers aren't communicating

分類Dev

Spring-Kafka Integration 1.0.0.RELEASE Issue with Producer

分類Dev

Kafka 2.9.1 producer 0.8.2.1 compile vs runtime dependencies

分類Dev

Java/Scala Kafka Producer does not send message to topic

分類Dev

Spring cloud stream kafka metrics are not shown in actuator metrics | Springboot 2.2.2 | SpringCloud Hoxton.SR4

分類Dev

How to expose webClient metrics in prometheus?

分類Dev

How to expose webClient metrics in prometheus?

分類Dev

How to access metrics of streaming query?

分類Dev

I want to enable Kafka errorhandling by annotation

分類Dev

How to fix "java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord" in Spark Streaming Kafka Consumer?

Related 関連記事

  1. 1

    How to enable Dropwizard Metrics in Play 2.5 HikariCP?

  2. 2

    How to Enable KubeAPI server for HPA Autoscaling Metrics

  3. 3

    How to Improve Performance of Kafka Producer when used in Synchronous Mode

  4. 4

    Improving performance of Kafka Producer

  5. 5

    Kafka Authentication Producer Unable to Connect Producer

  6. 6

    Kafka producer sending invalid characters

  7. 7

    kafka: keep track of producer offset

  8. 8

    Kafka Producer design - multiple topics

  9. 9

    Invalid Timestamp when writing a Kafka producer with sarama

  10. 10

    How do you link both spark and kafka in sbt?

  11. 11

    How to read from specific Kafka partition in Spark structured streaming

  12. 12

    How do I reduce a spark dataframe from kafka and collect the result?

  13. 13

    How to distribute data evenly in Kafka producing messages through Spark?

  14. 14

    How to partition Spark dataset using a field from Kafka input

  15. 15

    How to read stream nested JSON from kafka in Spark using Java

  16. 16

    How to get at least N number of logs from Kafka through Spark?

  17. 17

    ExceptionInInitializerError Spark Streaming Kafka

  18. 18

    Spark Kafka Streaming Issue

  19. 19

    Kafka Producer Callbackパフォーマンス

  20. 20

    Kafka producer and consumer on separate computers aren't communicating

  21. 21

    Spring-Kafka Integration 1.0.0.RELEASE Issue with Producer

  22. 22

    Kafka 2.9.1 producer 0.8.2.1 compile vs runtime dependencies

  23. 23

    Java/Scala Kafka Producer does not send message to topic

  24. 24

    Spring cloud stream kafka metrics are not shown in actuator metrics | Springboot 2.2.2 | SpringCloud Hoxton.SR4

  25. 25

    How to expose webClient metrics in prometheus?

  26. 26

    How to expose webClient metrics in prometheus?

  27. 27

    How to access metrics of streaming query?

  28. 28

    I want to enable Kafka errorhandling by annotation

  29. 29

    How to fix "java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord" in Spark Streaming Kafka Consumer?

ホットタグ

アーカイブ