使用Spark结构化流将数据写入JSON数组

库沙格拉·米塔尔

我必须将Spark Structure流中的数据写为JSON Array,我尝试使用以下代码:

df.selectExpr("to_json(struct(*)) AS value").toJSON

它返回我DataSet [String],但无法写为JSON Array。

电流输出:

{"name":"test","id":"id"}
{"name":"test1","id":"id1"}

预期产量:

[{"name":"test","id":"id"},{"name":"test1","id":"id1"}]

编辑(将评论移入问题):

使用建议的collect_list方法后,我得到

Exception in thread "main" org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;

然后我尝试了这样的事情-

withColumn("timestamp", unix_timestamp(col("event_epoch"), "MM/dd/yyyy hh:mm:ss aa")) .withWatermark("event_epoch", "1 minutes") .groupBy(col("event_epoch")) .agg(max(col("event_epoch")).alias("timestamp")) 

但我不想添加新列。

麦克风

您可以为此使用SQL内置函数collect_list此函数收集并返回一组非唯一元素(相比之下,collect_set该元素仅返回唯一元素)。

collect_list的源代码中,您将看到这是一个聚合函数。根据《结构化流输出模式编程指南》中有关输出模式的要求,强调了不带水印的聚合支持输出模式“完整”和“更新”。

在此处输入图片说明

根据您的评论,我不希望添加水印和新列。另外,您面临的错误

Exception in thread "main" org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark; 

提醒您不要使用输出模式“追加”。

在评论中,您提到计划将结果生成为Kafka消息。一个大JSON数组作为一个Kafka值。完整的代码看起来像

val df = spark.readStream
  .[...] // in my test I am reading from Kafka source
  .load()
  .selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value", "offset", "partition")
  // do not forget to convert you data into a String before writing to Kafka
  .selectExpr("CAST(collect_list(to_json(struct(*))) AS STRING) AS value")

df.writeStream
  .format("kafka")
  .outputMode("complete")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "test")
  .option("checkpointLocation", "/path/to/sparkCheckpoint")
  .trigger(Trigger.ProcessingTime(10000))
  .start()
  .awaitTermination()

给定键/值对(k1,v1),(k2,v2)和(k3,v3)作为输入,您将在Kafka主题中获得一个值,该值包含所有选定数据作为JSON数组:

[{"key":"k1","value":"v1","offset":7,"partition":0}, {"key":"k2","value":"v2","offset":8,"partition":0}, {"key":"k3","value":"v3","offset":9,"partition":0}]

已在Spark 3.0.1和Kafka 2.5.0中进行测试。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

从Spark结构化流以JSON数组形式写入数据

来自分类Dev

如何将静态数据帧与Spark结构化流中的流数据进行比较?

来自分类Dev

结构化流将 Parquet 文件写入 Hadoop

来自分类Dev

结构化流写入多个流

来自分类Dev

如何删除Spark结构化流创建的旧数据?

来自分类Dev

使用Spark结构化流检索图形信息

来自分类Dev

使用dictwriter并使用python将非结构化数据写入csv文件

来自分类Dev

结构化流如何动态解析kafka的json数据

来自分类Dev

将 JSON 保存到 HDFS 的结构化流

来自分类Dev

从Spark结构化流作业写入时的Delta表版本控制

来自分类Dev

使用 talend BigData 将半结构化数据转换为结构化数据

来自分类Dev

Spark结构化流中的流数据帧读取模式

来自分类Dev

Apache Spark:使用结构化数据是否好

来自分类Dev

在源中更新其基础数据时,结构化流中使用的Spark DataFrame会发生什么情况?

来自分类Dev

使用Java将非结构化数据(文本)转换为结构化格式

来自分类Dev

最好使用数组或对象存储结构化数据?

来自分类Dev

在Spark结构化流中将数据内部联接到左联接的DataFrame时丢失条目

来自分类Dev

将多个查询用于Spark结构化流的用例

来自分类Dev

如何在(Py)Spark结构化流中捕获不正确的(损坏的)JSON记录?

来自分类Dev

如何在Kafka Direct Stream中使用Spark结构化流?

来自分类Dev

如何找到Spark结构化流应用程序的使用者组ID?

来自分类Dev

如何使用Airflow重新启动失败的结构化流Spark作业?

来自分类Dev

使用套接字执行Spark结构化流,设置SCHEMA,在控制台中显示DATAFRAME

来自分类Dev

Spark结构化流-Kafka偏移处理

来自分类Dev

显示Spark结构化流作业消耗的事件数

来自分类Dev

Spark结构化流从查询异常中恢复

来自分类Dev

Spark结构化流-Kafka偏移处理

来自分类Dev

如何在 Python 中将 JSON 结构化数据写入文本文件?

来自分类Dev

JSON-结构化数据

Related 相关文章

  1. 1

    从Spark结构化流以JSON数组形式写入数据

  2. 2

    如何将静态数据帧与Spark结构化流中的流数据进行比较?

  3. 3

    结构化流将 Parquet 文件写入 Hadoop

  4. 4

    结构化流写入多个流

  5. 5

    如何删除Spark结构化流创建的旧数据?

  6. 6

    使用Spark结构化流检索图形信息

  7. 7

    使用dictwriter并使用python将非结构化数据写入csv文件

  8. 8

    结构化流如何动态解析kafka的json数据

  9. 9

    将 JSON 保存到 HDFS 的结构化流

  10. 10

    从Spark结构化流作业写入时的Delta表版本控制

  11. 11

    使用 talend BigData 将半结构化数据转换为结构化数据

  12. 12

    Spark结构化流中的流数据帧读取模式

  13. 13

    Apache Spark:使用结构化数据是否好

  14. 14

    在源中更新其基础数据时,结构化流中使用的Spark DataFrame会发生什么情况?

  15. 15

    使用Java将非结构化数据(文本)转换为结构化格式

  16. 16

    最好使用数组或对象存储结构化数据?

  17. 17

    在Spark结构化流中将数据内部联接到左联接的DataFrame时丢失条目

  18. 18

    将多个查询用于Spark结构化流的用例

  19. 19

    如何在(Py)Spark结构化流中捕获不正确的(损坏的)JSON记录?

  20. 20

    如何在Kafka Direct Stream中使用Spark结构化流?

  21. 21

    如何找到Spark结构化流应用程序的使用者组ID?

  22. 22

    如何使用Airflow重新启动失败的结构化流Spark作业?

  23. 23

    使用套接字执行Spark结构化流,设置SCHEMA,在控制台中显示DATAFRAME

  24. 24

    Spark结构化流-Kafka偏移处理

  25. 25

    显示Spark结构化流作业消耗的事件数

  26. 26

    Spark结构化流从查询异常中恢复

  27. 27

    Spark结构化流-Kafka偏移处理

  28. 28

    如何在 Python 中将 JSON 结构化数据写入文本文件?

  29. 29

    JSON-结构化数据

热门标签

归档