pysparkの累積合計

アスマダマニ

クラスごとの累積合計を計算しようとしています。sum(df.value).over(Window.partitionBy( '​​class')。orderBy( '​​time'))を使用すると、コードは正常に機能します。

df = sqlContext.createDataFrame( [(1,10,"a"),(3,2,"a"),(1,2,"b"),(2,5,"a"),(2,1,"b"),(9,0,"b"),(4,1,"b"),(7,8,"a"),(3,8,"b"),(2,5,"a"),(0,0,"a"),(4,3,"a")], 
                                     ["time", "value", "class"] )

time|value|class|
+----+-----+-----+
|   1|   10|    a|
|   3|    2|    a|
|   1|    2|    b|
|   2|    5|    a|
|   2|    1|    b|
|   9|    0|    b|
|   4|    1|    b|
|   7|    8|    a|
|   3|    8|    b|
|   2|    5|    a|
|   0|    0|    a|
|   4|    3|    a|


df.withColumn('cumsum_value', sum(df.value).over(Window.partitionBy('class').orderBy('time'))).show()


time|value|class|cumsum_value|
+----+-----+-----+------------+
|   1|    2|    b|           2|
|   2|    1|    b|           3|
|   3|    8|    b|          11|
|   4|    1|    b|          12|
|   9|    0|    b|          12|
|   0|    0|    a|           0|
|   1|   10|    a|          10|
|   2|    5|    a|          20|
|   2|    5|    a|          20|
|   3|    2|    a|          22|
|   4|    3|    a|          25|
|   7|    8|    a|          33|
+----+-----+-----+------------+

ただし、重複する行では機能しません。必要な出力は次のとおりです。

 time|value|class|cumsum_value|
+----+-----+-----+------------+
|   1|    2|    b|           2|
|   2|    1|    b|           3|
|   3|    8|    b|          11|
|   4|    1|    b|          12|
|   9|    0|    b|          12|
|   0|    0|    a|           0|
|   1|   10|    a|          10|
|   2|    5|    a|          15|
|   2|    5|    a|          20|
|   3|    2|    a|          22|
|   4|    3|    a|          25|
|   7|    8|    a|          33|
+----+-----+-----+------------+
murtihash

@paultのコメントに加えて、にrow_number()基づいて計算orderBy('time', 'value')し、orderBy別のwindow(w2)のその列を使用してを取得することをお勧めしますcum_sum

これは、時間が同じで値が同じである場合と、時間が同じで値が同じでない場合の両方を処理します。

from pyspark.sql import functions as F
from pyspark.sql.window import Window
w1=Window().partitionBy("class").orderBy("time","value")
w2=Window().partitionBy("class").orderBy('rownum')
df.withColumn('rownum', F.row_number().over(w1))\
  .withColumn('cumsum_value', F.sum("value").over(w2)).drop('rownum').show()

+----+-----+-----+------------+
|time|value|class|cumsum_value|
+----+-----+-----+------------+
|   1|    2|    b|           2|
|   2|    1|    b|           3|
|   3|    8|    b|          11|
|   4|    1|    b|          12|
|   9|    0|    b|          12|
|   0|    0|    a|           0|
|   1|   10|    a|          10|
|   2|    5|    a|          15|
|   2|    5|    a|          20|
|   3|    2|    a|          22|
|   4|    3|    a|          25|
|   7|    8|    a|          33|
+----+-----+-----+------------+

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事