pyspark用与最后一个非空值相关的一些计算替换空值

瓦西夫·坦维尔

嗨,我的问题与This(在pyspark中用先前已知的好值填入null)有关,但是我的问题的要求有一点变化:

   data:                                        expected output:       
   +------+-----+---------+---------+-----+     +------+-----+---------+---------+-----+
   |  item|store|timestamp|sales_qty|stock|     |  item|store|timestamp|sales_qty|stock|
   +------+-----+---------+---------+-----+     +------+-----+---------+---------+-----+
   |673895|35578| 20180101|        1| null|     |673895|35578| 20180101|        1| null|
   |673895|35578| 20180102|        0|  110|     |673895|35578| 20180102|        0|  110|
   |673895|35578| 20180103|        1| null|     |673895|35578| 20180103|        1|  109|
   |673895|35578| 20180104|        0| null|     |673895|35578| 20180104|        0|  109|
   |673895|35578| 20180105|        0|  109|  => |673895|35578| 20180105|        0|  109|
   |673895|35578| 20180106|        1| null|     |673895|35578| 20180106|        1|  108|
   |673895|35578| 20180107|        0|  108|     |673895|35578| 20180107|        0|  108|
   |673895|35578| 20180108|        0| null|     |673895|35578| 20180108|        0|  108|
   |673895|35578| 20180109|        0| null|     |673895|35578| 20180109|        0|  108|
   |673895|35578| 20180110|        1| null|     |673895|35578| 20180110|        1|  107|
   +------+-----+---------+---------+-----+     +------+-----+---------+---------+-----+

我的预期输出基于最后一个已知的非null值和sales_qty,如果存在sales_qty,则应根据该值调整库存值。我尝试了以下逻辑

        my_window = Window.partitionBy('item','store').orderBy('timestamp')
        df = df.withColumn("stock", F.when((F.isnull(F.col('stock'))),F.lag(df.stock).over(my_window)-F.col('sales_qty')).otherwise(F.col('stock')))

但是它仅适用于一个空值,有人可以帮助我达到预期的结果吗?

注意:数量并非总是连续减少,因此需要考虑最后一个非空值来计算新的

Murtihash

你可以试试看。我基本上首先生成两列(第一个非null值= 110)和stock2(基本上是股票的增量总和),然后将它们彼此相减以获得所需的股票。

from pyspark.sql.window import Window
from pyspark.sql import functions as F
w=Window().partitionBy("item","store").orderBy("timestamp")
w2=Window().partitionBy("item","store").orderBy("timestamp").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
df.withColumn("stock1", F.when(F.col("stock").isNull(), F.lit(0)).otherwise(F.col("stock")))\
.withColumn("stock2", F.sum("sales_qty").over(w)- F.lit(1))\
.withColumn("first", F.first("stock", True).over(w2))\
.withColumn("stock", F.col("first")-F.col("stock2"))\
.drop("stock1","stock2","first")\
.show()

+------+-----+---------+---------+-----+
|  item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101|        1|  110|
|673895|35578| 20180102|        0|  110|
|673895|35578| 20180103|        1|  109|
|673895|35578| 20180104|        0|  109|
|673895|35578| 20180105|        0|  109|
|673895|35578| 20180106|        1|  108|
|673895|35578| 20180107|        0|  108|
|673895|35578| 20180108|        0|  108|
|673895|35578| 20180109|        0|  108|
|673895|35578| 20180110|        1|  107|
+------+-----+---------+---------+-----+

如果您想将第一个值强制为null而不是110(如所需输出所示),则可以使用它(基本上使用行号将第一个110值替换为null):

from pyspark.sql.window import Window
from pyspark.sql import functions as F
w=Window().partitionBy("item","store").orderBy("timestamp")
w2=Window().partitionBy("item","store").orderBy("timestamp").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
df.withColumn("stock1", F.when(F.col("stock").isNull(), F.lit(0)).otherwise(F.col("stock")))\
.withColumn("stock2", F.sum("sales_qty").over(w)- F.lit(1))\
.withColumn("first", F.first("stock", True).over(w2))\
.withColumn("stock", F.col("first")-F.col("stock2"))\
.withColumn("num", F.row_number().over(w))\
.withColumn("stock", F.when(F.col("num")==1, F.lit(None)).otherwise(F.col("stock")))\
.drop("stock1","stock2","first","num")\
.show()


+------+-----+---------+---------+-----+
|  item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101|        1| null|
|673895|35578| 20180102|        0|  110|
|673895|35578| 20180103|        1|  109|
|673895|35578| 20180104|        0|  109|
|673895|35578| 20180105|        0|  109|
|673895|35578| 20180106|        1|  108|
|673895|35578| 20180107|        0|  108|
|673895|35578| 20180108|        0|  108|
|673895|35578| 20180109|        0|  108|
|673895|35578| 20180110|        1|  107|
+------+-----+---------+---------+-----+

附加数据输入和输出:

#input1
+------+-----+---------+---------+-----+
|  item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101|        1| null|
|673895|35578| 20180102|        0|  110|
|673895|35578| 20180103|        1| null|
|673895|35578| 20180104|        3| null|
|673895|35578| 20180105|        0|  109|
|673895|35578| 20180106|        1| null|
|673895|35578| 20180107|        0|  108|
|673895|35578| 20180108|        4| null|
|673895|35578| 20180109|        0| null|
|673895|35578| 20180110|        1| null|
+------+-----+---------+---------+-----+

#output1
+------+-----+---------+---------+-----+
|  item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101|        1| null|
|673895|35578| 20180102|        0|  110|
|673895|35578| 20180103|        1|  109|
|673895|35578| 20180104|        3|  106|
|673895|35578| 20180105|        0|  106|
|673895|35578| 20180106|        1|  105|
|673895|35578| 20180107|        0|  105|
|673895|35578| 20180108|        4|  101|
|673895|35578| 20180109|        0|  101|
|673895|35578| 20180110|        1|  100|
+------+-----+---------+---------+-----+


#input2
+------+-----+---------+---------+-----+
|  item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101|        1| null|
|673895|35578| 20180102|        0|  110|
|673895|35578| 20180103|        1| null|
|673895|35578| 20180104|        7| null|
|673895|35578| 20180105|        0|  102|
|673895|35578| 20180106|        0| null|
|673895|35578| 20180107|        4|   98|
|673895|35578| 20180108|        0| null|
|673895|35578| 20180109|        0| null|
|673895|35578| 20180110|        1| null|
+------+-----+---------+---------+-----+

#output2
+------+-----+---------+---------+-----+
|  item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101|        1| null|
|673895|35578| 20180102|        0|  110|
|673895|35578| 20180103|        1|  109|
|673895|35578| 20180104|        7|  102|
|673895|35578| 20180105|        0|  102|
|673895|35578| 20180106|        0|  102|
|673895|35578| 20180107|        4|   98|
|673895|35578| 20180108|        0|   98|
|673895|35578| 20180109|        0|   98|
|673895|35578| 20180110|        1|   97|
+------+-----+---------+---------+-----+

IFstock 数量不是像这样连续的

  df.show()

+------+-----+---------+---------+-----+
|  item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101|        1| null|
|673895|35578| 20180102|        0|  110|
|673895|35578| 20180103|        1| null|
|673895|35578| 20180104|        7| null|
|673895|35578| 20180105|        0|  112|
|673895|35578| 20180106|        2| null|
|673895|35578| 20180107|        0|  107|
|673895|35578| 20180108|        0| null|
|673895|35578| 20180109|        0| null|
|673895|35578| 20180110|        1| null|
+------+-----+---------+---------+-----+

您可以使用此方法:(我基本上为每个非null的末尾计算一个动态窗口)

from pyspark.sql.window import Window
from pyspark.sql import functions as F


w=Window().partitionBy("item","store").orderBy("timestamp")
w3=Window().partitionBy("item","store","stock5").orderBy("timestamp")
df.withColumn("stock1", F.when(F.col("stock").isNull(), F.lit(0)).otherwise(F.col("stock")))\
.withColumn("stock4", F.when(F.col("stock1")!=0, F.rank().over(w)).otherwise(F.col("stock1")))\
.withColumn("stock5", F.sum("stock4").over(w))\
.withColumn("stock6", F.sum("stock1").over(w3))\
.withColumn("sum", F.sum(F.when(F.col("stock1")!=F.col("stock6"),F.col("sales_qty")).otherwise(F.lit(0))).over(w3))\
.withColumn("stock2", F.when(F.col("sales_qty")!=0, F.col("stock6")-F.col("sum")).otherwise(F.col("stock")))\
.withColumn("stock", F.when((F.col("stock2").isNull())&(F.col("sales_qty")==0),F.col("stock6")-F.col("sum")).otherwise(F.col("stock2")))\
.drop("stock1","stock4","stock5","stock6","sum","stock2")\
.show()

+------+-----+---------+---------+-----+
|  item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101|        1|    0|
|673895|35578| 20180102|        0|  110|
|673895|35578| 20180103|        1|  109|
|673895|35578| 20180104|        7|  102|
|673895|35578| 20180105|        0|  112|
|673895|35578| 20180106|        2|  110|
|673895|35578| 20180107|        0|  107|
|673895|35578| 20180108|        0|  107|
|673895|35578| 20180109|        0|  107|
|673895|35578| 20180110|        1|  106|
+------+-----+---------+---------+-----+

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

用最后一个非空值填充空值-Oracle SQL

来自分类Dev

用最后一个非 NaN 值替换 NaN

来自分类Dev

将空值设置为列表中最接近的最后一个非空值-LINQ

来自分类Dev

用 Amibroker 中的最后一个非零值替换数组的零

来自分类Dev

MDX查询以查找icCube中的最后一个非空值

来自分类Dev

SQL(Postgres)使用最后一个非空天的值填写缺少的日期

来自分类Dev

Python数据框获取每行最后一个非空列的值

来自分类Dev

打印awk中最后一个非空列的值

来自分类Dev

返回以值为条件的最后一个非空单元格

来自分类Dev

BigQuery-选择序列中的最后一个非空值

来自分类Dev

如何在usql窗口表达式中获取最后一个非空值?

来自分类Dev

如何在关系表中获取非空的最后一个值

来自分类Dev

按日期在临时表中插入最后一个非空值

来自分类Dev

Elasticsearch:按字段排序,最后一个空值

来自分类Dev

用下一个递增数字填充空值| PySpark | 蟒蛇

来自分类Dev

Javascript rxjs - 当最后一个值从 observable 发出时做一些事情

来自分类Dev

返回第一个非空/空白值?

来自分类Dev

返回第一个非空/空白值?

来自分类Dev

快速为Generic类分配一些空值

来自分类Dev

SQL-空值应与另一个表的非空值匹配

来自分类Dev

mysql-基于空和空值的排序列表,列中的最后一个

来自分类Dev

Excel-查找单元格,导航到左侧的列,获取该列的最后一个非空值

来自分类Dev

如果最后一个和下一个非 NA 值相同,则替换 NA 值

来自分类Dev

使用数组中的一些数据,如果该值为空,则使用另一个数组中的数据

来自分类Dev

用联接中的最后一个可用值替换零值

来自分类Dev

返回一个空值的类

来自分类Dev

Excel公式获取行中的第一个和最后一个非空值并返回列标题

来自分类Dev

用数据帧中连续行中的第一个值替换最后一个值

来自分类Dev

用最后一个非零值填充1d numpy数组的零值

Related 相关文章

  1. 1

    用最后一个非空值填充空值-Oracle SQL

  2. 2

    用最后一个非 NaN 值替换 NaN

  3. 3

    将空值设置为列表中最接近的最后一个非空值-LINQ

  4. 4

    用 Amibroker 中的最后一个非零值替换数组的零

  5. 5

    MDX查询以查找icCube中的最后一个非空值

  6. 6

    SQL(Postgres)使用最后一个非空天的值填写缺少的日期

  7. 7

    Python数据框获取每行最后一个非空列的值

  8. 8

    打印awk中最后一个非空列的值

  9. 9

    返回以值为条件的最后一个非空单元格

  10. 10

    BigQuery-选择序列中的最后一个非空值

  11. 11

    如何在usql窗口表达式中获取最后一个非空值?

  12. 12

    如何在关系表中获取非空的最后一个值

  13. 13

    按日期在临时表中插入最后一个非空值

  14. 14

    Elasticsearch:按字段排序,最后一个空值

  15. 15

    用下一个递增数字填充空值| PySpark | 蟒蛇

  16. 16

    Javascript rxjs - 当最后一个值从 observable 发出时做一些事情

  17. 17

    返回第一个非空/空白值?

  18. 18

    返回第一个非空/空白值?

  19. 19

    快速为Generic类分配一些空值

  20. 20

    SQL-空值应与另一个表的非空值匹配

  21. 21

    mysql-基于空和空值的排序列表,列中的最后一个

  22. 22

    Excel-查找单元格,导航到左侧的列,获取该列的最后一个非空值

  23. 23

    如果最后一个和下一个非 NA 值相同,则替换 NA 值

  24. 24

    使用数组中的一些数据,如果该值为空,则使用另一个数组中的数据

  25. 25

    用联接中的最后一个可用值替换零值

  26. 26

    返回一个空值的类

  27. 27

    Excel公式获取行中的第一个和最后一个非空值并返回列标题

  28. 28

    用数据帧中连续行中的第一个值替换最后一个值

  29. 29

    用最后一个非零值填充1d numpy数组的零值

热门标签

归档