pyspark中的熊猫UDF

马可·德·维尔吉利斯

我正在尝试对spark数据框进行一系列观察。基本上,我有一份清单,我应该为每组创建缺少的清单。
在pandas中有此reindex功能,在pyspark中不可用。
我试图实现一个熊猫UDF:

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def reindex_by_date(df):
    df = df.set_index('dates')
    dates = pd.date_range(df.index.min(), df.index.max())
    return df.reindex(dates, fill_value=0).ffill()

看起来应该执行我需要的操作,但是此消息失败AttributeError: Can only use .dt accessor with datetimelike values我在这里做错了什么?
这里是完整的代码:

data = spark.createDataFrame(
        [(1, "2020-01-01", 0), 
        (1, "2020-01-03", 42), 
        (2, "2020-01-01", -1), 
        (2, "2020-01-03", -2)],
        ('id', 'dates', 'value'))

data = data.withColumn('dates', col('dates').cast("date"))

schema = StructType([
     StructField('id', IntegerType()),
     StructField('dates', DateType()),
     StructField('value', DoubleType())])

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def reindex_by_date(df):
     df = df.set_index('dates')
     dates = pd.date_range(df.index.min(), df.index.max())
     return df.reindex(dates, fill_value=0).ffill()

data = data.groupby('id').apply(reindex_by_date)

理想情况下,我想要这样的东西:

+---+----------+-----+                                                          
| id|     dates|value|
+---+----------+-----+
|  1|2020-01-01|    0|
|  1|2020-01-02|    0|
|  1|2020-01-03|   42|
|  2|2020-01-01|   -1|
|  2|2020-01-02|    0|
|  2|2020-01-03|   -2|
+---+----------+-----+
mpSchrader

情况1:每个ID都有各自的日期范围。

我会尽量减少udf的内容。在这种情况下,我只会在udf中计算每个ID的日期范围。对于其他部分,我将使用Spark本机函数。

from pyspark.sql import types as T
from pyspark.sql import functions as F

# Get min and max date per ID
date_ranges = data.groupby('id').agg(F.min('dates').alias('date_min'), F.max('dates').alias('date_max'))

# Calculate the date range for each ID
@F.udf(returnType=T.ArrayType(T.DateType()))
def get_date_range(date_min, date_max):
  return [t.date() for t in list(pd.date_range(date_min, date_max))]

# To get one row per potential date, we need to explode the UDF output
date_ranges = date_ranges.withColumn(
  'dates',
  F.explode(get_date_range(F.col('date_min'), F.col('date_max')))
)

date_ranges = date_ranges.drop('date_min', 'date_max')

# Add the value for existing entries and add 0 for others
result = date_ranges.join(
  data,
  ['id', 'dates'],
  'left'
)

result = result.fillna({'value': 0})

情况2:所有ID的日期范围相同

我认为这里不需要使用UDF。您想要的内容可以通过其他方式进行存档:首先,您将获得所有可能的ID和所有必要的日期。其次,交叉加入它们,这将为您提供所有可能的组合。第三,左键将原始数据合并到组合中。第四,将出现的空值替换为0。

# Get all unique ids
ids_df = data.select('id').distinct()

# Get the date series
date_min, date_max = data.agg(F.min('dates'), F.max('dates')).collect()[0]
dates = [[t.date()] for t in list(pd.date_range(date_min, date_max))]
dates_df = spark.createDataFrame(data=dates, schema="dates:date")

# Calculate all combinations
all_comdinations = ids_df.crossJoin(dates_df)

# Add the value column
result = all_comdinations.join(
  data,
  ['id', 'dates'],
  'left'
)

# Replace all null values with 0
result = result.fillna({'value': 0})

请注意此解决方案的以下限制:

  1. crossJoins可能会非常昂贵。可以在此相关问题中找到解决该问题的一种潜在解决方案
  2. collect语句和对Pandas的使用导致未完全并行化的Spark转换。

[编辑]分为两种情况,因为我首先认为所有ID都具有相同的日期范围。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

pyspark中的熊猫UDF

来自分类Dev

pyarrow错误:在pyspark中运行熊猫udf时

来自分类Dev

pySpark 中的 udf for 循环

来自分类Dev

在熊猫udf pyspark内部使用numpy

来自分类Dev

在PySpark中重新加载UDF

来自分类Dev

熊猫udf在PySpark数据帧行上循环

来自分类Dev

PySpark DataFrame中向量列上的UDF问题

来自分类Dev

pyspark中UDF的返回类型无效

来自分类Dev

在pyspark中对列表进行排序的udf

来自分类Dev

if 错误中的 pyspark udf 类似条件

来自分类Dev

Pyspark:在 UDF 中传递动态列

来自分类Dev

如何在PySpark的UDF中返回“元组类型”?

来自分类Dev

python udf中的pyspark字符串操作

来自分类Dev

udf(用户定义函数)如何在 pyspark 中工作?

来自分类Dev

在 pyspark 中的数据帧上应用 udf 后出错

来自分类Dev

PySpark UDF优化挑战

来自分类Dev

熊猫标量UDF失败,IllegalArgumentException

来自分类Dev

熊猫UDF和pyarrow 0.15.0

来自分类Dev

无法捕获Pyspark UDF异常

来自分类Dev

当函数在Pandas数据框中工作时,PySpark udf返回null

来自分类Dev

将整个行作为附加参数传递给PySpark中的UDF

来自分类Dev

如何在pyspark中使用pandas_udf拆分数据帧中的字符串

来自分类Dev

熊猫udf showString错误的简单示例

来自分类Dev

使用UDF进行Pyspark数据框联接

来自分类Dev

Pyspark UDF函数引发错误

来自分类Dev

pyspark如何使用两列编写UDF

来自分类Dev

在大数据上优化Pyspark UDF

来自分类Dev

pyspark提示未定义udf的错误

来自分类Dev

如何将第二个数据帧的列传递到 PySpark 1.6.1 中的 UDF