我正在创建一个火花流作业,它从 Kafka 主题读取 JSON 消息。对于我从 Dstream 获得的每个 RDD,我正在创建一个数据帧。我的要求是将此数据帧写入 hdfs 路径。在写入之前,我需要检查此消息的架构是否格式正确。因此,我创建了一个 StructType customSchema,其预期字段的顺序与 Kafka 主题中的 JSON 消息中的顺序相同。我试图比较两者,但这不起作用。即使所有字段都以正确的顺序存在,结果为假。
我有一个与 Kafka 主题格式相同的 json 文件。
{"transactionId":"12345","accountName":"XXX1","sessionKey":"WEB","description":"INR"}
我在它周围创建了一个数据框
val df=spark.read.json("/data/path/sample/")
scala> df.printSchema
root
|-- accountName: string (nullable = true)
|-- description: string (nullable = true)
|-- sessionKey: string (nullable = true)
|-- transactionId: string (nullable = true)
注意:当我打印我创建的 df 模式时,它是按字母顺序打印的。
我创建了一个 StructType 的 customSchema
scala> val schema1= StructType( Array (StructField("transactionId",StringType, true),StructField("accountName",StringType, true),StructField("sessionKey",StringType, true),StructField("description",StringType, true)))
schema1: org.apache.spark.sql.types.StructType = StructType(StructField(transactionId,StringType,true), StructField(accountName,StringType,true), StructField(sessionKey,StringType,true), StructField(description,StringType,true))
当我尝试匹配它时,结果为 false
scala> val d=df.schema==schema1
d: Boolean = false
使用 .equals 方法也会导致结果为 false 现在,如果我以与 printSchema 打印相同的方式创建 customSchema,
scala> val schema2= StructType( Array (StructField("accountName",StringType, true),StructField("description",StringType, true),StructField("sessionKey",StringType, true),StructField("transactionId",StringType, true)))
schema2: org.apache.spark.sql.types.StructType = StructType(StructField(accountName,StringType,true), StructField(description,StringType,true), StructField(sessionKey,StringType,true), StructField(transactionId,StringType,true))
如果我现在比较两者,它会按预期正常工作。
scala> val j=df.schema==schema2
j: Boolean = true
因此,根据我的观察,spark 是否会按字母顺序在内部重新排列字段的顺序,因为我在文档中找不到它。
如果未提供,Spark Json 会推断架构。如果需要按特定顺序排列select
是更好的选择。
val colsArr = Array("col1","col2","col3")
val df = df.select(colsArr.head,colsArr.tail:_*)
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句