Spark结构化流中的流数据帧读取模式

basigow

我是Apache Spark结构化流媒体的新手。我正在尝试从事件中心(XML格式)读取一些事件,并尝试从嵌套XML创建新的Spark DF。

我正在使用https://github.com/databricks/spark-xml描述的代码示例,并且在批处理模式下可以完美运行,但在结构化Spark流中却无法运行。

spark-xml Github库的代码块

import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import spark.implicits._
val df = ... /// DataFrame with XML in column 'payload' 
val payloadSchema = schema_of_xml(df.select("payload").as[String])
val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))

我的批处理代码

val df = Seq(
  (8, "<AccountSetup xmlns:xsi=\"test\"><Customers test=\"a\">d</Customers><tag1>7</tag1> <tag2>4</tag2> <mode>0</mode> <Quantity>1</Quantity></AccountSetup>"),
  (64, "<AccountSetup xmlns:xsi=\"test\"><Customers test=\"a\">d</Customers><tag1>6</tag1> <tag2>4</tag2>  <mode>0</mode> <Quantity>1</Quantity></AccountSetup>"),
  (27, "<AccountSetup xmlns:xsi=\"test\"><Customers test=\"a\">d</Customers><tag1>4</tag1> <tag2>4</tag2> <mode>3</mode> <Quantity>1</Quantity></AccountSetup>")
).toDF("number", "body")
)


val payloadSchema = schema_of_xml(df.select("body").as[String])
val parsed = df.withColumn("parsed", from_xml($"body", payloadSchema))

val final_df = parsed.select(parsed.col("parsed"))
display(final_df.select("parsed.*"))

在此处输入图片说明

我正在尝试为Spark结构化流执行相同的逻辑,例如以下代码:

结构化流代码

import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition }
import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._


val streamingInputDF = 
  spark.readStream
    .format("eventhubs")
    .options(eventHubsConf.toMap)
    .load()

val payloadSchema = schema_of_xml(streamingInputDF.select("body").as[String])
val parsed = streamingSelectDF.withColumn("parsed", from_xml($"body", payloadSchema))
val final_df = parsed.select(parsed.col("parsed"))

display(final_df.select("parsed.*"))

在指令的代码部分val payloadSchema = schema_of_xml(streamingInputDF.select("body").as[String])抛出错误 Queries with streaming sources must be executed with writeStream.start();;

更新资料

试着


val streamingInputDF = 
  spark.readStream
    .format("eventhubs")
    .options(eventHubsConf.toMap)
    .load()
    .select(($"body").cast("string"))

val body_value = streamingInputDF.select("body").as[String]
body_value.writeStream
    .format("console")
    .start()

spark.streams.awaitAnyTermination()


val payloadSchema = schema_of_xml(body_value)
val parsed = body_value.withColumn("parsed", from_xml($"body", payloadSchema))
val final_df = parsed.select(parsed.col("parsed"))

现在没有遇到错误,但Databricks处于“等待状态”

在此处输入图片说明 谢谢!!

麦克风

如果代码在批处理模式下工作,则没有任何问题。

重要的是,不仅要通过使用readStream将源转换为流load而且还需要将接收器部分转换为流。

您收到的错误消息只是提醒您也要查看接收器部分。您的数据框final_df实际上是一个流式数据框,必须从开始start

《结构化流指南》为您提供了所有可用输出接收器的良好概述,最简单的方法是将结果打印到控制台。

总而言之,您需要在程序中添加以下内容:

final_df.writeStream
    .format("console")
    .start()

spark.streams.awaitAnyTermination()

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

如何将静态数据帧与Spark结构化流中的流数据进行比较?

来自分类Dev

如何计算 ApacheSpark 结构化流中数据帧 API 的 z 分数?

来自分类Dev

如何删除Spark结构化流创建的旧数据?

来自分类Dev

从Spark结构化流以JSON数组形式写入数据

来自分类Dev

从 Kafka 读取时 Pyspark 结构化流中的异常

来自分类Dev

Spark结构化流从查询异常中恢复

来自分类Dev

结构化流 - 加入来自同一流源的 2 个数据帧

来自分类Dev

如何添加新文件以激发结构化流数据帧

来自分类Dev

从Kafka读取的Spark结构化流应用程序仅返回空值

来自分类Dev

在源中更新其基础数据时,结构化流中使用的Spark DataFrame会发生什么情况?

来自分类Dev

如何在Spark结构化流中基于时间戳字段重复数据删除并保持最新?

来自分类Dev

从Kafka主题读取文件路径,然后在结构化流中读取文件并写入DeltaLake

来自分类Dev

从Kafka主题读取流时,Spark结构化流是否存在一些超时问题?

来自分类Dev

结构化流写入多个流

来自分类Dev

在Spark结构化流中将数据内部联接到左联接的DataFrame时丢失条目

来自分类Dev

使用Spark结构化流将数据写入JSON数组

来自分类Dev

Spark结构化流-Kafka偏移处理

来自分类Dev

显示Spark结构化流作业消耗的事件数

来自分类Dev

使用Spark结构化流检索图形信息

来自分类Dev

Spark结构化流-Kafka偏移处理

来自分类Dev

如何在(Py)Spark结构化流中捕获不正确的(损坏的)JSON记录?

来自分类Dev

在带有水印和窗口聚合的Spark结构化流中运行多个查询

来自分类Dev

Spark结构化流时,DataFrame中的字符串列如何拆分为多个列

来自分类Dev

Spark结构化流应用中的执行者死亡

来自分类Dev

发送 Row.empty 时在 Spark 结构化流中获取 ArrayIndexOutOfBounds 异常

来自分类Dev

Spark 结构化流:JDBC 接收器中的主键

来自分类Dev

结构化流如何动态解析kafka的json数据

来自分类Dev

是否有Spark Arrow Streaming =箭头流+ Spark结构化流?

来自分类Dev

通过使用两个不同的Spark结构化流读取同一主题来调试Kafka管道

Related 相关文章

  1. 1

    如何将静态数据帧与Spark结构化流中的流数据进行比较?

  2. 2

    如何计算 ApacheSpark 结构化流中数据帧 API 的 z 分数?

  3. 3

    如何删除Spark结构化流创建的旧数据?

  4. 4

    从Spark结构化流以JSON数组形式写入数据

  5. 5

    从 Kafka 读取时 Pyspark 结构化流中的异常

  6. 6

    Spark结构化流从查询异常中恢复

  7. 7

    结构化流 - 加入来自同一流源的 2 个数据帧

  8. 8

    如何添加新文件以激发结构化流数据帧

  9. 9

    从Kafka读取的Spark结构化流应用程序仅返回空值

  10. 10

    在源中更新其基础数据时,结构化流中使用的Spark DataFrame会发生什么情况?

  11. 11

    如何在Spark结构化流中基于时间戳字段重复数据删除并保持最新?

  12. 12

    从Kafka主题读取文件路径,然后在结构化流中读取文件并写入DeltaLake

  13. 13

    从Kafka主题读取流时,Spark结构化流是否存在一些超时问题?

  14. 14

    结构化流写入多个流

  15. 15

    在Spark结构化流中将数据内部联接到左联接的DataFrame时丢失条目

  16. 16

    使用Spark结构化流将数据写入JSON数组

  17. 17

    Spark结构化流-Kafka偏移处理

  18. 18

    显示Spark结构化流作业消耗的事件数

  19. 19

    使用Spark结构化流检索图形信息

  20. 20

    Spark结构化流-Kafka偏移处理

  21. 21

    如何在(Py)Spark结构化流中捕获不正确的(损坏的)JSON记录?

  22. 22

    在带有水印和窗口聚合的Spark结构化流中运行多个查询

  23. 23

    Spark结构化流时,DataFrame中的字符串列如何拆分为多个列

  24. 24

    Spark结构化流应用中的执行者死亡

  25. 25

    发送 Row.empty 时在 Spark 结构化流中获取 ArrayIndexOutOfBounds 异常

  26. 26

    Spark 结构化流:JDBC 接收器中的主键

  27. 27

    结构化流如何动态解析kafka的json数据

  28. 28

    是否有Spark Arrow Streaming =箭头流+ Spark结构化流?

  29. 29

    通过使用两个不同的Spark结构化流读取同一主题来调试Kafka管道

热门标签

归档