我是Apache Spark结构化流媒体的新手。我正在尝试从事件中心(XML格式)读取一些事件,并尝试从嵌套XML创建新的Spark DF。
我正在使用https://github.com/databricks/spark-xml中描述的代码示例,并且在批处理模式下可以完美运行,但在结构化Spark流中却无法运行。
spark-xml Github库的代码块
import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import spark.implicits._
val df = ... /// DataFrame with XML in column 'payload'
val payloadSchema = schema_of_xml(df.select("payload").as[String])
val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))
我的批处理代码
val df = Seq(
(8, "<AccountSetup xmlns:xsi=\"test\"><Customers test=\"a\">d</Customers><tag1>7</tag1> <tag2>4</tag2> <mode>0</mode> <Quantity>1</Quantity></AccountSetup>"),
(64, "<AccountSetup xmlns:xsi=\"test\"><Customers test=\"a\">d</Customers><tag1>6</tag1> <tag2>4</tag2> <mode>0</mode> <Quantity>1</Quantity></AccountSetup>"),
(27, "<AccountSetup xmlns:xsi=\"test\"><Customers test=\"a\">d</Customers><tag1>4</tag1> <tag2>4</tag2> <mode>3</mode> <Quantity>1</Quantity></AccountSetup>")
).toDF("number", "body")
)
val payloadSchema = schema_of_xml(df.select("body").as[String])
val parsed = df.withColumn("parsed", from_xml($"body", payloadSchema))
val final_df = parsed.select(parsed.col("parsed"))
display(final_df.select("parsed.*"))
我正在尝试为Spark结构化流执行相同的逻辑,例如以下代码:
结构化流代码
import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition }
import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val streamingInputDF =
spark.readStream
.format("eventhubs")
.options(eventHubsConf.toMap)
.load()
val payloadSchema = schema_of_xml(streamingInputDF.select("body").as[String])
val parsed = streamingSelectDF.withColumn("parsed", from_xml($"body", payloadSchema))
val final_df = parsed.select(parsed.col("parsed"))
display(final_df.select("parsed.*"))
在指令的代码部分val payloadSchema = schema_of_xml(streamingInputDF.select("body").as[String])
抛出错误 Queries with streaming sources must be executed with writeStream.start();;
更新资料
试着
val streamingInputDF =
spark.readStream
.format("eventhubs")
.options(eventHubsConf.toMap)
.load()
.select(($"body").cast("string"))
val body_value = streamingInputDF.select("body").as[String]
body_value.writeStream
.format("console")
.start()
spark.streams.awaitAnyTermination()
val payloadSchema = schema_of_xml(body_value)
val parsed = body_value.withColumn("parsed", from_xml($"body", payloadSchema))
val final_df = parsed.select(parsed.col("parsed"))
现在没有遇到错误,但Databricks处于“等待状态”
如果代码在批处理模式下工作,则没有任何问题。
重要的是,不仅要通过使用readStream
和将源转换为流,load
而且还需要将接收器部分转换为流。
您收到的错误消息只是提醒您也要查看接收器部分。您的数据框final_df
实际上是一个流式数据框,必须从开始start
。
《结构化流指南》为您提供了所有可用输出接收器的良好概述,最简单的方法是将结果打印到控制台。
总而言之,您需要在程序中添加以下内容:
final_df.writeStream
.format("console")
.start()
spark.streams.awaitAnyTermination()
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句