我有一个spark-dataframe
名为的输入df
,
+---------------+---+---+---+---+
| CustomerID| P1| P2| P3| P4|
+---------------+---+---+---+---+
| 725153| 5| 6| 7| 8|
| 873008| 7| 8| 1| 2|
| 725116| 5| 6| 3| 2|
| 725110| 0| 1| 2| 5|
+---------------+---+---+---+---+
其中,P1,P2,P3,P4
我需要为每个CustomerID
. 并获得等价物column name
并将其放入df
.So 所以我的结果dataframe
应该是,
+---------------+----+----+
| CustomerID|col1|col2|
+---------------+----+----+
| 725153| P4| P3|
| 873008| P2| P1|
| 725116| P2| P1|
| 725110| P4| P3|
+---------------+----+----+
这里为第一行,8
并且7
是最大值。每个等效的列名称是P4
和P3
。因此,对于它的特殊性CustomerID
,它应该包含值P4
和P3
。这可以pyspark
通过使用pandas
数据框来实现。
nlargest = 2
order = np.argsort(-df.values, axis=1)[:, :nlargest]
result = pd.DataFrame(df.columns[order],columns=['top{}'.format(i) for i in range(1, nlargest+1)],index=recommend_df.index)
但是我怎样才能做到这一点scala
呢?
您可以使用UDF
来获得您想要的结果。在UDF
你zip
所有的各自的值的列名,然后排序Array
按价值终于从它返回前两名列名。下面是相同的代码。
//get all the columns that you want
val requiredCol = df.columns.zipWithIndex.filter(_._2!=0).map(_._1)
//define a UDF which sorts according to the value and returns top two column names
val topTwoColumns = udf((seq: Seq[Int]) =>
seq.zip(requiredCol).
sortBy(_._1)(Ordering[Int].reverse).
take(2).map(_._2))
现在,您可以使用withColumn
列值并将其array
作为先前定义的 UDF传递。
df.withColumn("col", topTwoColumns(array(requiredCol.map(col(_)):_*))).
select($"CustomerID",
$"col".getItem(0).as("col1"),
$"col".getItem(1).as("col2")).show
//output
//+----------+----+----+
//|CustomerID|col1|col2|
//+----------+----+----+
//| 725153| P4| P3|
//| 873008| P2| P1|
//| 725116| P2| P1|
//| 725110| P4| P3|
//+----------+----+----+
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句