我得到的代码来自以下链接:https : //johnpaton.net/posts/forward-fill-spark/它具有我要完成的任务的某些背景。
from pyspark.sql import Window
from pyspark.sql.functions import last
# define the window
window = Window.partitionBy('location')\
.orderBy('time')\
.rowsBetween(-sys.maxsize, 0)
# define the forward-filled column
filled_column = last(spark_df['temperature'], ignorenulls=True).over(window)
# do the fill
spark_df_filled = spark_df.withColumn('temp_filled_spark', filled_column)
基本上,last()
函数用于查找最后一个非空值的状态。如果所有值都为null,则返回null。
但是,如果该组中的所有列均为空,我想分配一个默认值。我尝试了不同的方法,但无法弄清楚。
因此,基本上,如果某个位置的温度都为零,那么我希望有一种方法将其设置为默认值。
Some examples:
I want to fill them with default values for the case below:
location temp temp
1 null 0
1 null =====> 0
1 null 0
I do not want to fill them with default values for the case below:
location temp temp
1 null null
1 50 ======> 50
1 60 60
如果给定位置的任何记录包含非空值,则可以定义另一列以用作指示符。例如:
window_2 = Window.partitionBy('location').rowsBetween(-sys.maxsize, sys.maxsize)
max_column = max(spark_df['temperature']).over(window_2)
然后,将该列与您的列一起使用filled_column
以有条件地填写最终结果:
temp_filled_spark = when(max_column.isNull(),0).otherwise(filled_column)
spark_df_filled = spark_df.withColumn('temp_filled_spark', temp_filled_spark)
可能不是很优雅或性能很好,但是应该可以工作。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句