我有一个pyspark数据框,其中包含“ id”和日期列“ parsed_date”(dtypes:日期,格式:YYYY-mm-dd)。我创建了一个函数,该函数可以为我提供给定日期范围内每一天的ID计数。
功能返回2个数据帧。df1的行距日期范围为±1周,而df2的行距给定日期为±2周,方法如下:
df1应该过滤范围1:(day-t,day + t)中的行df2应该过滤范围2中的行:(day-2 t,day-t).append(day + t,day + 2 t)
这是我一直用于创建具有所需日期范围的df1的代码,但我不知道为df2附加/合并日期范围
def part_1(df, day, t):
"""
Example usage: df_list = part_1(df, '2017-12-18', 2)
Returns a list of 2 dataframes.
"""
h1_df1 = (df.filter(f"parsed_date between '{day}' - interval {t} days and '{day}' + interval {t} days")
.withColumn('count_before', F.count('id').over(Window.partitionBy('parsed_date')))
.orderBy('parsed_date')
)
h1_df2 = (df.filter(f"parsed_date between '{day}' - interval {t*2} days and '{day}' - interval {t} days").concat(f"parsed_date between ('{day}' + interval {t} days and '{day}' + interval {t*2} days")
.withColumn('count_after', F.count('id').over(Window.partitionBy('parsed_date')))
.orderBy('parsed_date')
)
return [h1_df1, h1_df2]
调用part_1时h1_df1的输出(df,'2017-12-18',2)
+-------+-----------+------------+
| id|parsed_date|count_before|
+-------+-----------+------------+
|1471783| 2017-12-16| 2|
|1471885| 2017-12-16| 2|
|1472928| 2017-12-17| 2|
|1476917| 2017-12-17| 2|
|1477469| 2017-12-18| 1|
|1478190| 2017-12-19| 4|
|1478570| 2017-12-19| 4|
|1481415| 2017-12-19| 4|
|1472592| 2017-12-19| 4|
|1474023| 2017-12-20| 1|
+-------+-----------+------------+
调用part_1时h2_df1的预期结果(df,'2017-12-18',2)
+-------+-----------+------------+
| id|parsed_date| count_after|
+-------+-----------+------------+
|1471783| 2017-12-14| 1|
|1471885| 2017-12-16| 3|
|1472928| 2017-12-16| 3|
|1476917| 2017-12-16| 3|
|1477469| 2017-12-20| 2|
|1478190| 2017-12-20| 2|
|1478570| 2017-12-21| 2|
|1481415| 2017-12-21| 2|
|1472592| 2017-12-22| 2|
|1474023| 2017-12-22| 2|
+-------+-----------+------------+
我很想获得有关创建h2_df1的帮助。虽然我尝试了一下,但是它不起作用。
请帮忙!
您可以使用or
而不是来组合两个过滤条件concat
。
h1_df2 = (df.filter(f"""
(parsed_date between '{day}' - interval {t*2} days and '{day}' - interval {t} days)
or
(parsed_date between '{day}' + interval {t} days and '{day}' + interval {t*2} days)
""")
.withColumn('count_after', F.count('id').over(Window.partitionBy('parsed_date')))
.orderBy('parsed_date')
)
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句