How to use kafka.group.id in spark 3.0 to avoid always start the offset for latest position?

yyuankm

Based on the introduction in Spark 3.0, https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html. It should be possible to set "kafka.group.id" to track the offset. For our use case, I want to avoid the potential data loss if the streaming spark job failed and restart. Based on my previous questions, I have a feeling that kafka.group.id in Spark 3.0 is something that will help.

How to specify the group id of kafka consumer for spark structured streaming?

How to ensure no data loss for kafka data ingestion through Spark Structured Streaming?

However, I tried the settings in spark 3.0 as below.

package com.example

/**
 * @author ${user.name}
 */
import scala.math.random

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, BooleanType, LongType}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SaveMode
import org.apache.spark.SparkFiles
import java.util.Properties
import org.postgresql.Driver
import org.apache.spark.sql.streaming.Trigger
import java.time.Instant
import org.apache.hadoop.fs.{FileSystem, Path}
import java.net.URI
import java.sql.Connection
import java.sql.DriverManager
import java.sql.ResultSet
import java.sql.SQLException
import java.sql.Statement


//import org.apache.spark.sql.hive.HiveContext

import scala.io.Source

import java.nio.charset.StandardCharsets

import com.amazonaws.services.kms.{AWSKMS, AWSKMSClientBuilder}
import com.amazonaws.services.kms.model.DecryptRequest
import java.nio.ByteBuffer
import com.google.common.io.BaseEncoding


object App {
    
    def main(args: Array[String]): Unit = {
      
      val spark: SparkSession = SparkSession.builder()
        .appName("MY-APP")
        .getOrCreate()

      import spark.sqlContext.implicits._

      spark.catalog.clearCache()
      spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
      spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

      spark.sparkContext.setLogLevel("ERROR")
      spark.sparkContext.setCheckpointDir("/home/ec2-user/environment/spark/spark-local/checkpoint")
      
      System.gc()
      
      val df = spark.readStream
        .format("kafka")
          .option("kafka.bootstrap.servers", "mybroker.io:6667")
          .option("subscribe", "mytopic")
          .option("kafka.security.protocol", "SASL_SSL")
          .option("kafka.ssl.truststore.location", "/home/ec2-user/environment/spark/spark-local/creds/cacerts")
          .option("kafka.ssl.truststore.password", "changeit")
          .option("kafka.ssl.truststore.type", "JKS")
          .option("kafka.sasl.kerberos.service.name", "kafka")
          .option("kafka.sasl.mechanism", "GSSAPI")
          .option("kafka.group.id","MYID")
          .load()

      df.printSchema()

      
      val schema = new StructType()
        .add("id", StringType)
        .add("x", StringType)
        .add("eventtime", StringType)

      val idservice = df.selectExpr("CAST(value AS STRING)")
        .select(from_json(col("value"), schema).as("data"))
        .select("data.*")

       
      val monitoring_df = idservice
                .selectExpr("cast(id as string) id", 
                            "cast(x as string) x",
                            "cast(eventtime as string) eventtime")              

      val monitoring_stream = monitoring_df.writeStream
                              .trigger(Trigger.ProcessingTime("120 seconds"))
                              .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
                                if(!batchDF.isEmpty) 
                                {
                                    batchDF.persist()
                                    printf("At %d, the %dth microbatch has %d records and %d partitions \n", Instant.now.getEpochSecond, batchId, batchDF.count(), batchDF.rdd.partitions.size)                                    
                                    batchDF.show()

                                    batchDF.write.mode(SaveMode.Overwrite).option("path", "/home/ec2-user/environment/spark/spark-local/tmp").saveAsTable("mytable")
                                    spark.catalog.refreshTable("mytable")
                                    
                                    batchDF.unpersist()
                                    spark.catalog.clearCache()
                                }
                            }
                            .start()
                            .awaitTermination()
    }
   
}

The spark job is tested in the standalone mode by using below spark-submit command, but the same problem exists when I deploy in cluster mode in AWS EMR.

spark-submit --master local[1] --files /home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf,/home/ec2-user/environment/spark/spark-localreds/cacerts,/home/ec2-user/environment/spark/spark-local/creds/krb5.conf,/home/ec2-user/environment/spark/spark-local/creds/my.keytab --driver-java-options "-Djava.security.auth.login.config=/home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf -Djava.security.krb5.conf=/home/ec2-user/environment/spark/spark-local/creds/krb5.conf" --conf spark.dynamicAllocation.enabled=false --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf -Djava.security.krb5.conf=/home/ec2-user/environment/spark/spark-local/creds/krb5.conf" --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf -Djava.security.krb5.conf=/home/ec2-user/environment/spark/spark-local/creds/krb5.conf" --conf spark.yarn.maxAppAttempts=1000 --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 --class com.example.App ./target/sparktest-1.0-SNAPSHOT-jar-with-dependencies.jar

Then, I started the streaming job to read the streaming data from Kafka topic. After some time, I killed the spark job. Then, I wait for 1 hour to start the job again. If I understand correctly, the new streaming data should start from the offset when I killed the spark job. However, it still starts as the latest offset, which caused data loss during the time I stopped the job.

Do I need to configure more options to avoid data loss? Or do I have some misunderstanding for the Spark 3.0? Thanks!

Problem solved

The key issue here is that the checkpoint must be added to the query specifically. To just add checkpoint for SparkContext is not enough. After adding the checkpoint, it is working. In the checkpoint folder, it will create a offset subfolder, which contains offset file, 0, 1, 2, 3.... For each file, it will show the offset information for different partition.

{"8":109904920,"2":109905750,"5":109905789,"4":109905621,"7":109905330,"1":109905746,"9":109905750,"3":109905936,"6":109905531,"0":109905583}}

One suggestion is to put the checkpoint to some external storage, such as s3. It can help recover the offset even when you need to rebuild the EMR cluster itself in case.

mike

According to the Spark Structured Integration Guide, Spark itself is keeping track of the offsets and there are no offsets committed back to Kafka. That means if your Spark Streaming job fails and you restart it all necessary information on the offsets is stored in Spark's checkpointing files.

Even if you set the ConsumerGroup name with kafka.group.id, your application will still not commit the messages back to Kafka. The information on the next offset to read is only available in the checkpointing files of your Spark application.

If you stop and restart your application without a re-deployment and ensure that you do not delete old checkpoint files, your application will continue reading from where it left off.

In the Spark Structured Streaming documentation on Recovering from Failures with Checkpointing it is written that:

"In case of a failure or intentional shutdown, you can recover the previous progress and state of a previous query, and continue where it left off. This is done using checkpointing and write-ahead logs. You can configure a query with a checkpoint location, and the query will save all the progress information (i.e. range of offsets processed in each trigger) [...]"

This can be achieved by setting the following option in your writeStream query (it is not sufficient to set the checkpoint directory in your SparkContext configurations):

.option("checkpointLocation", "path/to/HDFS/dir")

In the docs it is also noted that "This checkpoint location has to be a path in an HDFS compatible file system, and can be set as an option in the DataStreamWriter when starting a query."

In addition, the fault tolerance capabilities of Spark Structured Streaming also depends on your output sink as described in section Output Sinks.

As you are currently using the ForeachBatch Sink, you might not have restart capabilities in your application.

enter image description here

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

From Dev

Spring Kafka - How to reset offset to latest with a group id?

From Dev

How to fetch offset id while consuming Kafka from Spark, save it in Cassandra and use it to restart Kafka?

From Dev

How to fetch offset id while consuming Kafka from Spark, save it in Cassandra and use it to restart Kafka?

From Dev

How to save latest offset that Spark consumed to ZK or Kafka and can read back after restart

From Dev

How to get latest offset for a partition for a kafka topic?

From Dev

How to avoid continuous "Resetting offset" and "Seeking to LATEST offset"?

From Dev

Manually set offset for kafka group id

From Dev

How to set offset committed by the consumer group using Spark's Direct Stream for Kafka?

From Dev

Make apache storm topology to use latest offset from kafka

From Dev

How can I get the LATEST offset of a kafka topic?

From Dev

Kafka consumer group offset retention

From Dev

Use CSS3 translate instead of offset position top/left

From Dev

Reading latest in spark kafka streaming

From Dev

position/offset of div returning as 0

From Dev

How to find out the latest offset of a Kafka topic to know when my reader is up-to-date with topic?

From Dev

How to get latest offset/size of a Kafka topic using KafkaAdminClient (Java) for 2.x version

From Dev

How to group genes regarding their id and position , python

From Dev

How to make Windows use always the latest CLR for .Net assemblies?

From Dev

How do I use a switch to offset the start of a for loop?

From Dev

Spark Structured Streaming - kafka offset handling

From Dev

Spark Structured Streaming - kafka offset handling

From Dev

Spark Streaming not honoring auto.offset.reset="smallest" or group.id?

From Dev

Kafka Stream offset reset to zero for consumer group

From Dev

how to specify consumer group in Kafka Spark Streaming using direct stream

From Dev

AutoCompleteTextView OnItemClick position is always "0"

From Dev

RecyclerView position in getItemViewType() is always 0

From Dev

Field _id is always 0

From Dev

How is Apache Kafka offset generated?

From Dev

Absolute Position with 0 offset in all directions

Related Related

  1. 1

    Spring Kafka - How to reset offset to latest with a group id?

  2. 2

    How to fetch offset id while consuming Kafka from Spark, save it in Cassandra and use it to restart Kafka?

  3. 3

    How to fetch offset id while consuming Kafka from Spark, save it in Cassandra and use it to restart Kafka?

  4. 4

    How to save latest offset that Spark consumed to ZK or Kafka and can read back after restart

  5. 5

    How to get latest offset for a partition for a kafka topic?

  6. 6

    How to avoid continuous "Resetting offset" and "Seeking to LATEST offset"?

  7. 7

    Manually set offset for kafka group id

  8. 8

    How to set offset committed by the consumer group using Spark's Direct Stream for Kafka?

  9. 9

    Make apache storm topology to use latest offset from kafka

  10. 10

    How can I get the LATEST offset of a kafka topic?

  11. 11

    Kafka consumer group offset retention

  12. 12

    Use CSS3 translate instead of offset position top/left

  13. 13

    Reading latest in spark kafka streaming

  14. 14

    position/offset of div returning as 0

  15. 15

    How to find out the latest offset of a Kafka topic to know when my reader is up-to-date with topic?

  16. 16

    How to get latest offset/size of a Kafka topic using KafkaAdminClient (Java) for 2.x version

  17. 17

    How to group genes regarding their id and position , python

  18. 18

    How to make Windows use always the latest CLR for .Net assemblies?

  19. 19

    How do I use a switch to offset the start of a for loop?

  20. 20

    Spark Structured Streaming - kafka offset handling

  21. 21

    Spark Structured Streaming - kafka offset handling

  22. 22

    Spark Streaming not honoring auto.offset.reset="smallest" or group.id?

  23. 23

    Kafka Stream offset reset to zero for consumer group

  24. 24

    how to specify consumer group in Kafka Spark Streaming using direct stream

  25. 25

    AutoCompleteTextView OnItemClick position is always "0"

  26. 26

    RecyclerView position in getItemViewType() is always 0

  27. 27

    Field _id is always 0

  28. 28

    How is Apache Kafka offset generated?

  29. 29

    Absolute Position with 0 offset in all directions

HotTag

Archive