背景:我编写了一个简单的Spark结构化蒸汽应用程序,用于将数据从Kafka迁移到S3。发现为了支持一次保证,spark创建了_spark_metadata文件夹,该文件夹最终变得太大,而当流媒体应用程序长时间运行时,元数据文件夹变得如此之大,以至于我们开始收到OOM错误。我想摆脱Spark结构化流的元数据和检查点文件夹,并自己管理偏移量。
我们如何管理Spark流中的偏移量:我已经使用val offsetRanges = rdd.asInstanceOf [HasOffsetRanges] .offsetRanges来获取Spark结构化流中的偏移量。但是想知道如何使用Spark结构化流技术获取偏移量和其他元数据来管理我们自己的检查点。您是否有实现检查点的示例程序?
我们如何在Spark结构化流中管理偏移量?查看此JIRA https://issues-test.apache.org/jira/browse/SPARK-18258。好像没有提供偏移量。我们应该怎么做?
问题在于6小时内元数据的大小增加到45MB,并且一直增长到接近13GB。分配的驱动程序内存为5GB。那时,系统因OOM而崩溃。想知道如何避免使此元数据变得如此之大?如何使元数据不记录太多信息。
码:
1. Reading records from Kafka topic
Dataset<Row> inputDf = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.option("startingOffsets", "earliest") \
.load()
2. Use from_json API from Spark to extract your data for further transformation in a dataset.
Dataset<Row> dataDf = inputDf.select(from_json(col("value").cast("string"), EVENT_SCHEMA).alias("event"))
....withColumn("oem_id", col("metadata.oem_id"));
3. Construct a temp table of above dataset using SQLContext
SQLContext sqlContext = new SQLContext(sparkSession);
dataDf.createOrReplaceTempView("event");
4. Flatten events since Parquet does not support hierarchical data.
5. Store output in parquet format on S3
StreamingQuery query = flatDf.writeStream().format("parquet")
Dataset dataDf = inputDf.select(from_json(col("value").cast("string"), EVENT_SCHEMA).alias("event")) .select("event.metadata", "event.data", "event.connection", "event.registration_event","event.version_event" ); SQLContext sqlContext = new SQLContext(sparkSession); dataDf.createOrReplaceTempView("event"); Dataset flatDf = sqlContext .sql("select " + " date, time, id, " + flattenSchema(EVENT_SCHEMA, "event") + " from event"); StreamingQuery query = flatDf .writeStream() .outputMode("append") .option("compression", "snappy") .format("parquet") .option("checkpointLocation", checkpointLocation) .option("path", outputPath) .partitionBy("date", "time", "id") .trigger(Trigger.ProcessingTime(triggerProcessingTime)) .start(); query.awaitTermination();
For non-batch Spark Structured Streaming KAFKA integration:
Quote:
Structured Streaming ignores the offsets commits in Apache Kafka.
Instead, it relies on its own offsets management on the driver side which is responsible for distributing offsets to executors and for checkpointing them at the end of the processing round (epoch or micro-batch).
You need not worry if you follow the Spark KAFKA integration guides.
Excellent reference: https://www.waitingforcode.com/apache-spark-structured-streaming/apache-spark-structured-streaming-apache-kafka-offsets-management/read
For batch the situation is different, you need to manage that yourself and store the offsets.
更新根据评论,我建议问题略有不同,建议您查看Spark结构化流式检查点清除。除了更新的评论和没有错误的事实之外,我建议您在Spark结构化流https://www.waitingforcode.com/apache-spark-structured-streaming/checkpoint-storage-structured-流/读。查看与我的风格不同的代码,但是看不到任何明显的错误。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句