我正在尝试使用Spark Scala将一些数据写入BigQuery,我的spark df看起来像,
root
|-- id: string (nullable = true)
|-- cost: double (nullable = false)
|-- nodes: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- settled: string (nullable = true)
| | |-- constant: string (nullable = true)
|-- status: string (nullable = true)
我试图更改数据框的结构。
val schema = StructType(Array(
StructField("id", StringType, true),
StructField("cost", DoubleType, true),
StructField("nodes", StructType(Array(StructField("settled", StringType), StructField("constant", StringType)))),
StructField("status", StringType, true)))
val actualDf = spark.createDataFrame(results, schema)
但这没有用。将其写入BigQuery时,列名如下所示,
id,成本,nodes.list.element.settled,nodes.list.element.constant,状态
是否有可能将这些列名称更改为
ID,费用,结算,常量,状态
您可以将explode
节点数组转换为扁平的列结构,然后将数据帧写入bigquery。
例:
val jsn_ds=Seq("""{"id":1, "cost": "2.0","nodes":[{"settled":"u","constant":"p"}],"status":"s"}""").toDS
spark.read.json(jsn_ds).printSchema
// root
// |-- cost: string (nullable = true)
// |-- id: long (nullable = true)
// |-- nodes: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- constant: string (nullable = true)
// | | |-- settled: string (nullable = true)
// |-- status: string (nullable = true)
spark.read.json(jsn_ds).
withColumn("expld",explode('nodes)).
select("*","expld.*").
drop("expld","nodes").
show()
//+----+---+------+--------+-------+
//|cost| id|status|constant|settled|
//+----+---+------+--------+-------+
//| 2.0| 1| s| p| u|
//+----+---+------+--------+-------+
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句