製品が(毎日)何回使用されたかを示すSparkデータフレームがあります。次のようになります。
| x_id | product | usage | yyyy_mm_dd | status |
|------|---------|-------|------------|--------|
| 10 | prod_go | 15 | 2020-10-10 | i |
| 10 | prod_rv | 7 | 2020-10-10 | fc |
| 10 | prod_mb | 0 | 2020-10-10 | n |
| 15 | prod_go | 0 | 2020-10-10 | n |
| 15 | prod_rv | 5 | 2020-10-10 | fc |
| 15 | prod_mb | 1 | 2020-10-10 | fc |
| 10 | prod_go | 20 | 2020-10-11 | i |
| 10 | prod_rv | 11 | 2020-10-11 | i |
| 10 | prod_mb | 3 | 2020-10-11 | fc |
| 15 | prod_go | 0 | 2020-10-11 | n |
| 15 | prod_rv | 5 | 2020-10-11 | fc |
| 15 | prod_mb | 1 | 2020-10-11 | fc |
ステータス列はに基づいていusage
ます。usage
が0の場合、はになりますn
。usage
が1から9の間の場合、はstatus
fcになります。場合usage
である> = 10は、その後、status
私は次のようになります。
私は、このスパークデータフレームに二つの追加の列を紹介したい、ですdate_reached_fc
とdate_reached_i
。これらの列は、の各ステータスに到達したmin(yyyy_mm_dd)
ときx_id
をそれぞれ保持する必要がありますproduct
。
サンプルデータに基づくと、出力は次のようになります。
| x_id | product | usage | yyyy_mm_dd | status | date_reached_fc | date_reached_i |
|------|---------|-------|------------|--------|-----------------|----------------|
| 10 | prod_go | 15 | 2020-10-10 | i | null | 2020-10-10 |
| 10 | prod_rv | 7 | 2020-10-10 | fc | 2020-10-10 | null |
| 10 | prod_mb | 0 | 2020-10-10 | n | null | null |
| 15 | prod_go | 0 | 2020-10-10 | n | null | null |
| 15 | prod_rv | 5 | 2020-10-10 | fc | 2020-10-10 | null |
| 15 | prod_mb | 1 | 2020-10-10 | fc | 2020-10-10 | null |
| 10 | prod_go | 20 | 2020-10-11 | i | null | 2020-10-10 |
| 10 | prod_rv | 11 | 2020-10-11 | i | 2020-10-10 | 2020-10-11 |
| 10 | prod_mb | 3 | 2020-10-11 | fc | 2020-10-11 | null |
| 15 | prod_go | 0 | 2020-10-11 | n | null | null |
| 15 | prod_rv | 5 | 2020-10-11 | fc | 2020-10-10 | null |
| 15 | prod_mb | 1 | 2020-10-11 | fc | 2020-10-10 | null |
順序は質問とは少し異なりますが、結果は正しいはずです...基本的にmin
はウィンドウ上で使用when
し、関連する日付のみをフィルタリングするためにも使用します。
from pyspark.sql import functions as F, Window
df2 = df.withColumn(
'date_reached_fc',
F.min(F.when(F.col('status') == 'fc', F.col('yyyy_mm_dd'))).over(Window.partitionBy('x_id', 'product').orderBy('yyyy_mm_dd', 'usage'))
).withColumn(
'date_reached_i',
F.min(F.when(F.col('status') == 'i', F.col('yyyy_mm_dd'))).over(Window.partitionBy('x_id', 'product').orderBy('yyyy_mm_dd', 'usage'))
).orderBy('x_id', 'product', 'yyyy_mm_dd', 'usage')
df2.show()
+----+-------+-----+----------+------+---------------+--------------+
|x_id|product|usage|yyyy_mm_dd|status|date_reached_fc|date_reached_i|
+----+-------+-----+----------+------+---------------+--------------+
| 10|prod_go| 15|2020-10-10| i| null| 2020-10-10|
| 10|prod_go| 20|2020-10-11| i| null| 2020-10-10|
| 10|prod_mb| 0|2020-10-10| n| null| null|
| 10|prod_mb| 3|2020-10-11| fc| 2020-10-11| null|
| 10|prod_rv| 7|2020-10-10| fc| 2020-10-10| null|
| 10|prod_rv| 11|2020-10-11| i| 2020-10-10| 2020-10-11|
| 15|prod_go| 0|2020-10-10| n| null| null|
| 15|prod_go| 0|2020-10-11| n| null| null|
| 15|prod_mb| 1|2020-10-10| fc| 2020-10-10| null|
| 15|prod_mb| 1|2020-10-11| fc| 2020-10-10| null|
| 15|prod_rv| 5|2020-10-10| fc| 2020-10-10| null|
| 15|prod_rv| 5|2020-10-11| fc| 2020-10-10| null|
+----+-------+-----+----------+------+---------------+--------------+
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加