我有一个由这两列组成的spark数据框,我想根据时间戳列上的简单条件为每一行提供一个ID。对于每个用户,如果下一个和上一个时间戳之间的差小于10秒,我将继续附加相同的ID,否则我将更新该ID并继续进行,直到为每行分配一个ID。
自从我使用pyspark以来,所有内容都应该用python编码。
为了使事情更容易理解,这里有一个例子:
启动DF
+------------------+
| User| timestamp|
+------------------+
| user0| 100 |
| user1| 102 |
| user0| 109 |
| user2| 103 |
| user1| 108 |
| user0| 119 |
| user0| 140 |
| user0| 142 |
+------------------+
所需的DF是这样的
+----------------------+
| User| timestamp| ID|
+----------------------+
| user0| 100 | 1|
| user1| 102 | 2|
| user0| 109 | 1|
| user2| 103 | 3|
| user1| 108 | 2|
| user0| 119 | 1|
| user0| 140 | 4|
| user0| 142 | 4|
+----------------------+
或者如果alghoritm在给定用户的ID之前分配,则可能是类似的事情,我并不在乎,也可以
+----------------------+
| User| timestamp| ID|
+----------------------+
| user0| 100 | 1|
| user1| 102 | 3|
| user0| 109 | 1|
| user2| 103 | 4|
| user1| 108 | 3|
| user0| 119 | 1|
| user0| 140 | 2|
| user0| 142 | 2|
+----------------------+
如您所见,时间戳为140的user0具有不同的ID(2),因为与前一个时间戳的差大于10。
如果我可以循环并动态分配每个单元格,这将很容易,但是它违反了使用spark数据帧的目的,而且我认为由于它们是不可变的,因此无法实现。
在Spark中最有效的方法是什么?提前致谢!
您可以首先为每个用户生成一个ID,然后按如下所示将它们组合到不同的用户中。
import pyspark.sql.functions as F
from pyspark.sql.window import Window
w = Window.partitionBy('User').orderBy('timestamp')
df2 = df.withColumn(
'begin',
F.coalesce(
F.col('timestamp') - F.lag('timestamp').over(w) > 10,
F.lit(True)
).cast('int')
).withColumn(
'userid',
F.sum('begin').over(w.rowsBetween(Window.unboundedPreceding, 0))
).withColumn(
'ID',
F.dense_rank().over(Window.orderBy('userid', 'User'))
)
# If you just want to keep your columns, do:
# df2 = df2.select('User', 'timestamp', 'ID')
df2.show()
+-----+---------+-----+------+---+
| User|timestamp|begin|userid| ID|
+-----+---------+-----+------+---+
|user0| 100| 1| 1| 1|
|user0| 109| 0| 1| 1|
|user0| 119| 0| 1| 1|
|user1| 102| 1| 1| 2|
|user1| 108| 0| 1| 2|
|user2| 103| 1| 1| 3|
|user0| 140| 1| 2| 4|
|user0| 142| 0| 2| 4|
+-----+---------+-----+------+---+
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句