我从外部文件创建了一个数据框 DF,它具有以下架构:
(id, field1, field2, field3) 分区列:id
数据示例是
000, 11_field1, 22_field2, 33_field3
001, 111_field1, 222_field2, 333_field3
我想从 DF 创建另一个数据框,其架构是
(id, fieleName, fieldValue)
数据示例是
000, field1, 11_field1
000, field2, 22_field2
000, field3, 33_field3
001, field1, 111_field1
001, field2, 222_field2
001, field3, 333_field3
谁能告诉我如何获取新的数据框?
您可以pyspark
使用以下explode
选项实现此目的
首先导入必要的库和函数
from pyspark.sql import SQLContext, Row
假设您的数据框是df
.
如果你这样做 df.show()
你应该得到如下结果
+---+----------+----------+----------+
| id| field1| field2| field3|
+---+----------+----------+----------+
| 0| 11_field1| 22_field2| 33_field3|
| 1|111_field1|222_field2|333_field3|
+---+----------+----------+----------+
然后将要分解的所有列映射为 2 列。在这里,您希望除 id 之外的所有列爆炸。因此,请执行以下操作
cols= df.columns[1:]
然后将其转换data frame
为rdd
如下所示
rdd = data.rdd.map(lambda x: Row(id=x[0], val=dict(zip(cols, x[1:]))))
要检查 rdd 是如何映射的,请执行以下操作
rdd.take()
你会得到如下结果
[Row(id=0, val={'field2': u'22_field2', 'field3': u'33_field3', 'field1': u'11_field1'}), Row(id=1, val={'field2': u'222_field2', 'field3': u'333_field3', 'field1': u'111_field1'})]
然后将rdd
返回转换为data frame
saydf2
df2 = sqlContext.createDataFrame(rdd)
然后做df2.show()
。你应该得到如下结果
+---+--------------------+
| id| val|
+---+--------------------+
| 0|Map(field3 -> 33_...|
| 1|Map(field3 -> 333...|
+---+--------------------+
然后将数据帧 df2 注册为临时表
df2.registerTempTable('mytempTable')
然后在数据框上运行如下查询:
df3 = sqlContext.sql( """select id,explode(val) AS (fieldname,fieldvalue) from mytempTable""")
然后做df3.show()
,你应该得到如下结果
+---+---------+----------+
| id|fieldname|fieldvalue|
+---+---------+----------+
| 0| field3| 33_field3|
| 0| field2| 22_field2|
| 0| field1| 11_field1|
| 1| field3|333_field3|
| 1| field2|222_field2|
| 1| field1|111_field1|
+---+---------+----------+
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句