我们如何在Spark结构化流中管理偏移量?(_spark_metadata问题)

炼金术士

背景:我编写了一个简单的Spark结构化蒸汽应用程序,用于将数据从Kafka迁移到S3。发现为了支持一次保证,spark创建了_spark_metadata文件夹,该文件夹最终变得太大,而当流媒体应用程序长时间运行时,元数据文件夹变得如此之大,以至于我们开始收到OOM错误。我想摆脱Spark结构化流的元数据和检查点文件夹,并自己管理偏移量。

我们如何管理Spark流中的偏移量:我已经使用val offsetRanges = rdd.asInstanceOf [HasOffsetRanges] .offsetRanges来获取Spark结构化流中的偏移量。但是想知道如何使用Spark结构化流技术获取偏移量和其他元数据来管理我们自己的检查点。您是否有实现检查点的示例程序?

我们如何在Spark结构化流中管理偏移量?查看此JIRA https://issues-test.apache.org/jira/browse/SPARK-18258好像没有提供偏移量。我们应该怎么做?

问题在于6小时内元数据的大小增加到45MB,并且一直增长到接近13GB。分配的驱动程序内存为5GB。那时,系统因OOM而崩溃。想知道如何避免使此元数据变得如此之大?如何使元数据不记录太多信息。

码:

1. Reading records from Kafka topic
  Dataset<Row> inputDf = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .option("startingOffsets", "earliest") \
  .load()
2. Use from_json API from Spark to extract your data for further transformation in a dataset.
   Dataset<Row> dataDf = inputDf.select(from_json(col("value").cast("string"), EVENT_SCHEMA).alias("event"))
       ....withColumn("oem_id", col("metadata.oem_id"));
3. Construct a temp table of above dataset using SQLContext
   SQLContext sqlContext = new SQLContext(sparkSession);
   dataDf.createOrReplaceTempView("event");
4. Flatten events since Parquet does not support hierarchical data.
5. Store output in parquet format on S3
   StreamingQuery query = flatDf.writeStream().format("parquet")

在此处输入图片说明

Dataset dataDf = inputDf.select(from_json(col("value").cast("string"), EVENT_SCHEMA).alias("event")) .select("event.metadata", "event.data", "event.connection", "event.registration_event","event.version_event" ); SQLContext sqlContext = new SQLContext(sparkSession); dataDf.createOrReplaceTempView("event"); Dataset flatDf = sqlContext .sql("select " + " date, time, id, " + flattenSchema(EVENT_SCHEMA, "event") + " from event"); StreamingQuery query = flatDf .writeStream() .outputMode("append") .option("compression", "snappy") .format("parquet") .option("checkpointLocation", checkpointLocation) .option("path", outputPath) .partitionBy("date", "time", "id") .trigger(Trigger.ProcessingTime(triggerProcessingTime)) .start(); query.awaitTermination();

thebluephantom

For non-batch Spark Structured Streaming KAFKA integration:

Quote:

Structured Streaming ignores the offsets commits in Apache Kafka.

Instead, it relies on its own offsets management on the driver side which is responsible for distributing offsets to executors and for checkpointing them at the end of the processing round (epoch or micro-batch).

You need not worry if you follow the Spark KAFKA integration guides.

Excellent reference: https://www.waitingforcode.com/apache-spark-structured-streaming/apache-spark-structured-streaming-apache-kafka-offsets-management/read

For batch the situation is different, you need to manage that yourself and store the offsets.

更新根据评论,我建议问题略有不同,建议您查看Spark结构化流式检查点清除除了更新的评论和没有错误的事实之外,我建议您在Spark结构化流https://www.waitingforcode.com/apache-spark-structured-streaming/checkpoint-storage-structured-流/读查看与我的风格不同的代码,但是看不到任何明显的错误。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

在使用Kafka的Spark结构化流媒体中,Spark如何管理多个主题的偏移

来自分类Dev

Spark结构化流-Kafka偏移处理

来自分类Dev

Spark结构化流-Kafka偏移处理

来自分类Dev

如何在(Py)Spark结构化流中捕获不正确的(损坏的)JSON记录?

来自分类Dev

清晰的偏移量引发来自Kafka的结构化流

来自分类Dev

如何将静态数据帧与Spark结构化流中的流数据进行比较?

来自分类Dev

如何在Spark结构化流中基于时间戳字段重复数据删除并保持最新?

来自分类Dev

如何删除Spark结构化流创建的旧数据?

来自分类Dev

Spark结构化流从查询异常中恢复

来自分类Dev

如何在Kafka Direct Stream中使用Spark结构化流?

来自分类Dev

Spark结构化流时,DataFrame中的字符串列如何拆分为多个列

来自分类Dev

在 _spark_metadata 中什么也没找到

来自分类Dev

Spark结构化流作业如何处理流-静态DataFrame连接?

来自分类Dev

Spark结构化流中的流数据帧读取模式

来自分类Dev

如何优化 Spark 结构化流应用程序中的执行程序实例数量?

来自分类Dev

显示Spark结构化流作业消耗的事件数

来自分类Dev

使用Spark结构化流检索图形信息

来自分类Dev

从Spark结构化流以JSON数组形式写入数据

来自分类Dev

如何找到Spark结构化流应用程序的使用者组ID?

来自分类Dev

如何使用Airflow重新启动失败的结构化流Spark作业?

来自分类Dev

从Kafka主题读取流时,Spark结构化流是否存在一些超时问题?

来自分类Dev

在带有水印和窗口聚合的Spark结构化流中运行多个查询

来自分类Dev

Spark结构化流应用中的执行者死亡

来自分类Dev

发送 Row.empty 时在 Spark 结构化流中获取 ArrayIndexOutOfBounds 异常

来自分类Dev

Spark 结构化流:JDBC 接收器中的主键

来自分类Dev

如何在Spring Kafka中禁用提交偏移量以在本地存储偏移量?

来自分类Dev

如何在Scala Spark中持久保存我们从dataFrame动态创建的列表

来自分类Dev

是否有Spark Arrow Streaming =箭头流+ Spark结构化流?

来自分类Dev

linux如何管理文件的偏移量

Related 相关文章

  1. 1

    在使用Kafka的Spark结构化流媒体中,Spark如何管理多个主题的偏移

  2. 2

    Spark结构化流-Kafka偏移处理

  3. 3

    Spark结构化流-Kafka偏移处理

  4. 4

    如何在(Py)Spark结构化流中捕获不正确的(损坏的)JSON记录?

  5. 5

    清晰的偏移量引发来自Kafka的结构化流

  6. 6

    如何将静态数据帧与Spark结构化流中的流数据进行比较?

  7. 7

    如何在Spark结构化流中基于时间戳字段重复数据删除并保持最新?

  8. 8

    如何删除Spark结构化流创建的旧数据?

  9. 9

    Spark结构化流从查询异常中恢复

  10. 10

    如何在Kafka Direct Stream中使用Spark结构化流?

  11. 11

    Spark结构化流时,DataFrame中的字符串列如何拆分为多个列

  12. 12

    在 _spark_metadata 中什么也没找到

  13. 13

    Spark结构化流作业如何处理流-静态DataFrame连接?

  14. 14

    Spark结构化流中的流数据帧读取模式

  15. 15

    如何优化 Spark 结构化流应用程序中的执行程序实例数量?

  16. 16

    显示Spark结构化流作业消耗的事件数

  17. 17

    使用Spark结构化流检索图形信息

  18. 18

    从Spark结构化流以JSON数组形式写入数据

  19. 19

    如何找到Spark结构化流应用程序的使用者组ID?

  20. 20

    如何使用Airflow重新启动失败的结构化流Spark作业?

  21. 21

    从Kafka主题读取流时,Spark结构化流是否存在一些超时问题?

  22. 22

    在带有水印和窗口聚合的Spark结构化流中运行多个查询

  23. 23

    Spark结构化流应用中的执行者死亡

  24. 24

    发送 Row.empty 时在 Spark 结构化流中获取 ArrayIndexOutOfBounds 异常

  25. 25

    Spark 结构化流:JDBC 接收器中的主键

  26. 26

    如何在Spring Kafka中禁用提交偏移量以在本地存储偏移量?

  27. 27

    如何在Scala Spark中持久保存我们从dataFrame动态创建的列表

  28. 28

    是否有Spark Arrow Streaming =箭头流+ Spark结构化流?

  29. 29

    linux如何管理文件的偏移量

热门标签

归档