我有这个输入DataFrame
input_df:
| C1 | C2 | C3 |
| ------------- |
| A | 1 | 12/06/2012 |
| A | 2 | 13/06/2012 |
| B | 3 | 12/06/2012 |
| B | 4 | 2012年6月17日|
| C | 5 | 14/06/2012 |
| ---------- |
转换后,我想按C1进行这种DataFrame分组,并创建C4列,其列由C2和C3的一对列表构成
output_df:
| C1 | C4 |
| --------------------------------------------- |
| A | (1,12/06/2012),(2,12/06/2012)|
| B | (3,12/06/2012),(4,12/06/2012)|
| C | (5,12/06/2012)|
| --------------------------------------------- |
当我尝试这样做时,我接近结果:
val output_df = input_df.map(x => (x(0), (x(1), x(2))) ).groupByKey()
我得到这个结果
(A,CompactBuffer((1, 12/06/2012), (2, 13/06/2012)))
(B,CompactBuffer((3, 12/06/2012), (4, 17/06/2012)))
(C,CompactBuffer((5, 14/06/2012)))
但是我不知道如何将其转换为DataFrame,这是否是实现此目标的好方法。
即使有其他方法,也欢迎任何建议
//请尝试一下
val conf = new SparkConf().setAppName("groupBy").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val rdd = sc.parallelize(
Seq(("A",1,"12/06/2012"),("A",2,"13/06/2012"),("B",3,"12/06/2012"),("B",4,"17/06/2012"),("C",5,"14/06/2012")) )
val v1 = rdd.map(x => (x._1, x ))
val v2 = v1.groupByKey()
val v3 = v2.mapValues(v => v.toArray)
val df2 = v3.toDF("aKey","theValues")
df2.printSchema()
val first = df2.first
println (first)
println (first.getString(0))
val values = first.getSeq[Row](1)
val firstArray = values(0)
println (firstArray.getString(0)) //B
println (firstArray.getInt(1)) //3
println (firstArray.getString(2)) //12/06/2012
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句