別の列の値に基づく1つの列のpysparkラグ関数

いずれかの列の値に基づいてラグ値を作成できるようにしたい。

与えられたデータでは、QdfはQuestionデータフレームであり、AdfはAnswerデータフレームです。追加の説明列を追加しました(最終データでは実際には必要ありません)。

from pyspark.sql.window import Window
import pyspark.sql.functions as func
from pyspark.sql.types import *
from pyspark.sql import SQLContext

ID = ['A' for i in range(0,10)]+ ['B' for i in range(0,10)]
Day = range(1,11)+range(1,11)
Delay = [2, 2, 2, 3, 2, 4, 3, 2, 2, 2, 2, 2, 3, 2, 4, 3, 2, 2, 2, 3]
Despatched = [2, 3, 1, 4, 6, 2, 6, 5, 3, 6, 3, 1, 2, 4, 1, 2, 3, 3, 6, 1]
Delivered = [0, 0, 2, 3, 1, 0, 10, 0, 0, 13, 0, 0, 3, 1, 0, 6, 0, 0, 6, 3]
Explanation = ["-", "-", "-", "-", "-", "-", "10 (4+6)", "-", "-", "13 (2+6+5)", "-", "-", "-", "-", "-", "6 (2+4)", "-", "-", "6 (1+2+3)", "-"]

QSchema = StructType([StructField("ID", StringType()),StructField("Day", IntegerType()),StructField("Delay", IntegerType()),StructField("Despatched", IntegerType())])
Qdata = map(list, zip(*[ID,Day,Delay,Despatched]))
Qdf = spark.createDataFrame(Qdata,schema=QSchema) 
Qdf.show()


+---+---+-----+----------+
| ID|Day|Delay|Despatched|
+---+---+-----+----------+
|  A|  1|    2|         2|
|  A|  2|    2|         3|
|  A|  3|    2|         1|
|  A|  4|    3|         4|
|  A|  5|    2|         6|
|  A|  6|    4|         2|
|  A|  7|    3|         6|
|  A|  8|    2|         5|
|  A|  9|    2|         3|
|  A| 10|    2|         6|
|  B|  1|    2|         3|
|  B|  2|    2|         1|
|  B|  3|    3|         2|
|  B|  4|    2|         4|
|  B|  5|    4|         1|
|  B|  6|    3|         2|
|  B|  7|    2|         3|
|  B|  8|    2|         3|
|  B|  9|    2|         6|
|  B| 10|    3|         1|
+---+---+-----+----------+

発送数量は、遅延時間後に配達されたものとして記録する必要があります。理想的にはlag function、遅延に基づいてディスパッチされた列にを適用できれば素晴らしいと思いますAnswerデータセットは次のようになります。

Adata = map(list, zip(*[ID,Day,Delay,Despatched,Delivered,Explanation]))
ASchema = StructType([StructField("ID", StringType()),StructField("Day", IntegerType()),StructField("Delay", IntegerType()),StructField("Despatched", IntegerType()),StructField("Delivered", IntegerType()),StructField("Explanation", StringType())])
Adf = spark.createDataFrame(Adata,schema=ASchema) 
Adf.show()

+---+---+-----+----------+---------+-----------+
| ID|Day|Delay|Despatched|Delivered|Explanation|
+---+---+-----+----------+---------+-----------+
|  A|  1|    2|         2|        0|          -|
|  A|  2|    2|         3|        0|          -|
|  A|  3|    2|         1|        2|          -|
|  A|  4|    3|         4|        3|          -|
|  A|  5|    2|         6|        1|          -|
|  A|  6|    4|         2|        0|          -|
|  A|  7|    3|         6|       10|   10 (4+6)|
|  A|  8|    2|         5|        0|          -|
|  A|  9|    2|         3|        0|          -|
|  A| 10|    2|         6|       13| 13 (2+6+5)|
|  B|  1|    2|         3|        0|          -|
|  B|  2|    2|         1|        0|          -|
|  B|  3|    3|         2|        3|          -|
|  B|  4|    2|         4|        1|          -|
|  B|  5|    4|         1|        0|          -|
|  B|  6|    3|         2|        6|    6 (2+4)|
|  B|  7|    2|         3|        0|          -|
|  B|  8|    2|         3|        0|          -|
|  B|  9|    2|         6|        6|  6 (1+2+3)|
|  B| 10|    3|         1|        3|          -|
+---+---+-----+----------+---------+-----------+

以下のコードを試して、2の一定のラグを取得しました。

Qdf1=Qdf.withColumn('Delivered_lag',func.lag(Qdf['Despatched'],2).over(Window.partitionBy("ID").orderBy("Day")))

しかし、ある列でラグを使用し、別の列でラグを使用しようとすると、次のエラーが発生します。

Qdf1=Qdf.withColumn('Delivered_lag',func.lag(Qdf['Despatched'],Qdf['Delay']).over(Window.partitionBy("ID").orderBy("Day")))

TypeError: '列'オブジェクトは呼び出せません

どうすればこれを乗り越えることができますか?PySparkバージョン2.3.1とPythonバージョン2.7.13を使用しています。

クロノイク

ラグ-functionは、カウントパラメータとして固定値がかかりますが、何を行うことができますすることでループを作成することであるときそうでない場合は、あなたが欲しいものを手に入れます:

from pyspark.sql.window import Window
import pyspark.sql.functions as F
import pyspark.sql.types as T 

ID = ['A' for i in range(0,10)]+ ['B' for i in range(0,10)]
#I had to modify this line as I'am working with python3
Day = list(range(1,11))+list(range(1,11))
Delay = [2, 2, 2, 3, 2, 4, 3, 2, 2, 2, 2, 2, 3, 2, 4, 3, 2, 2, 2, 3]
Despatched = [2, 3, 1, 4, 6, 2, 6, 5, 3, 6, 3, 1, 2, 4, 1, 2, 3, 3, 6, 1]
Delivered = [0, 0, 2, 3, 1, 0, 10, 0, 0, 13, 0, 0, 3, 1, 0, 6, 0, 0, 6, 3]
Explanation = ["-", "-", "-", "-", "-", "-", "10 (4+6)", "-", "-", "13 (2+6+5)", "-", "-", "-", "-", "-", "6 (2+4)", "-", "-", "6 (1+2+3)", "-"]

QSchema = T.StructType([T.StructField("ID", T.StringType()),T.StructField("Day", T.IntegerType()),T.StructField("Delay", T.IntegerType()),T.StructField("Despatched", T.IntegerType())])
Qdata = map(list, zip(*[ID,Day,Delay,Despatched]))
Qdf = spark.createDataFrame(Qdata,schema=QSchema) 
#until here it was basically your code

#At first we add an empty Delivered_lag column to the Qdf
#That allows us to use the same functionality for all iterations of the following loop
Qdf = Qdf.withColumn('Delivered_lag',  F.lit(None).cast(T.IntegerType()))

#Now we loop over the distinctive values of Qdf.delay and run the lag function for every value
#otherwise is necessary to keep the previous calculated values 
for delay in Qdf.select('delay').distinct().collect():
    Qdf = Qdf.withColumn('Delivered_lag', F.when(Qdf['Delay'] == delay.delay, F.lag(Qdf['Despatched'],delay.delay).over(Window.partitionBy("ID").orderBy("Day"))).otherwise(Qdf['Delivered_lag']))

Qdf.show()

出力:

+---+---+-----+----------+-------------+ 
| ID|Day|Delay|Despatched|Delivered_lag|
+---+---+-----+----------+-------------+ 
|  B|  1|    2|         3|         null|
|  B|  2|    2|         1|         null|
|  B|  3|    3|         2|         null| 
|  B|  4|    2|         4|            1| 
|  B|  5|    4|         1|            3| 
|  B|  6|    3|         2|            2| 
|  B|  7|    2|         3|            1| 
|  B|  8|    2|         3|            2| 
|  B|  9|    2|         6|            3| 
|  B| 10|    3|         1|            3| 
|  A|  1|    2|         2|         null| 
|  A|  2|    2|         3|         null| 
|  A|  3|    2|         1|            2| 
|  A|  4|    3|         4|            2| 
|  A|  5|    2|         6|            1| 
|  A|  6|    4|         2|            3| 
|  A|  7|    3|         6|            4| 
|  A|  8|    2|         5|            2| 
|  A|  9|    2|         3|            6| 
|  A| 10|    2|         6|            5| 
+---+---+-----+----------+-------------+

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

別の列の値に基づく1つの列の平均値

分類Dev

R:別の列の値に基づく1つの列からの値のビニング

分類Dev

SQL複数の値を持つ1つの列に基づくフラグの追加

分類Dev

別の列の値に基づく1つの列のExcel連結

分類Dev

他の複数の列に基づく1つの列の最大値

分類Dev

別の列値ラムダ関数に基づくDatabricksコアラ列の割り当て

分類Dev

R-別の列を使用する関数に基づいて1つの列に値を追加する

分類Dev

複数のクエリに基づいて1つの列に多くの値を返す関数

分類Dev

2つの別々の列に基づく値の集計

分類Dev

別の値に基づく1つの列によるフィルタリング

分類Dev

1つの列の最大値と別の列の特定のIDに基づくSQL選択

分類Dev

別の配列の値に基づいて、1つの配列から複数の配列を作成する-python

分類Dev

期間に基づく1つの列の平均+別の列の条件

分類Dev

1つの列の値に基づいて、グループ全体で別の列の値を変更します

分類Dev

別の列の値に基づくopenpyxlカラーセル

分類Dev

2つの列に基づく最大値

分類Dev

pyspark:別のRDDの特定の列に基づいて1つのRDDをフィルタリングします

分類Dev

別の列に基づく日付のランキング-Spotfire

分類Dev

SQL / VBA-基準に基づく1つの列からの複数の列

分類Dev

1つの列に基づくSQLServer Group By

分類Dev

別の列の値に基づいてラグを作成する

分類Dev

SQL-テーブルの行を列の値に基づくラグ関数と比較する

分類Dev

別の列に基づいて1つの列の値を減算する

分類Dev

別の列の値に基づいて1つの列を合計します

分類Dev

別の列の値に基づく列の乱数

分類Dev

MySQL:別の列の値に基づく列の合計

分類Dev

別の列の値に基づく列の累積合計(R)

分類Dev

別のパンダの違いに基づく1つの列の違い

分類Dev

groupbyと1列の文字列に基づくランキング

Related 関連記事

  1. 1

    別の列の値に基づく1つの列の平均値

  2. 2

    R:別の列の値に基づく1つの列からの値のビニング

  3. 3

    SQL複数の値を持つ1つの列に基づくフラグの追加

  4. 4

    別の列の値に基づく1つの列のExcel連結

  5. 5

    他の複数の列に基づく1つの列の最大値

  6. 6

    別の列値ラムダ関数に基づくDatabricksコアラ列の割り当て

  7. 7

    R-別の列を使用する関数に基づいて1つの列に値を追加する

  8. 8

    複数のクエリに基づいて1つの列に多くの値を返す関数

  9. 9

    2つの別々の列に基づく値の集計

  10. 10

    別の値に基づく1つの列によるフィルタリング

  11. 11

    1つの列の最大値と別の列の特定のIDに基づくSQL選択

  12. 12

    別の配列の値に基づいて、1つの配列から複数の配列を作成する-python

  13. 13

    期間に基づく1つの列の平均+別の列の条件

  14. 14

    1つの列の値に基づいて、グループ全体で別の列の値を変更します

  15. 15

    別の列の値に基づくopenpyxlカラーセル

  16. 16

    2つの列に基づく最大値

  17. 17

    pyspark:別のRDDの特定の列に基づいて1つのRDDをフィルタリングします

  18. 18

    別の列に基づく日付のランキング-Spotfire

  19. 19

    SQL / VBA-基準に基づく1つの列からの複数の列

  20. 20

    1つの列に基づくSQLServer Group By

  21. 21

    別の列の値に基づいてラグを作成する

  22. 22

    SQL-テーブルの行を列の値に基づくラグ関数と比較する

  23. 23

    別の列に基づいて1つの列の値を減算する

  24. 24

    別の列の値に基づいて1つの列を合計します

  25. 25

    別の列の値に基づく列の乱数

  26. 26

    MySQL:別の列の値に基づく列の合計

  27. 27

    別の列の値に基づく列の累積合計(R)

  28. 28

    別のパンダの違いに基づく1つの列の違い

  29. 29

    groupbyと1列の文字列に基づくランキング

ホットタグ

アーカイブ