pyspark collect_set在groupby之外的列

奥斯卡·弗利

我正在尝试使用collect_set获取属于groupby的categorie_names字符串列表我的代码是

from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql import functions as F

sc = SparkContext("local")
sqlContext = HiveContext(sc)
df = sqlContext.createDataFrame([
     ("1", "cat1", "Dept1", "product1", 7),
     ("2", "cat2", "Dept1", "product1", 100),
     ("3", "cat2", "Dept1", "product2", 3),
     ("4", "cat1", "Dept2", "product3", 5),
    ], ["id", "category_name", "department_id", "product_id", "value"])

df.show()
df.groupby("department_id", "product_id")\
    .agg({'value': 'sum'}) \
    .show()

#            .agg( F.collect_set("category_name"))\

输出是

+---+-------------+-------------+----------+-----+
| id|category_name|department_id|product_id|value|
+---+-------------+-------------+----------+-----+
|  1|         cat1|        Dept1|  product1|    7|
|  2|         cat2|        Dept1|  product1|  100|
|  3|         cat2|        Dept1|  product2|    3|
|  4|         cat1|        Dept2|  product3|    5|
+---+-------------+-------------+----------+-----+

+-------------+----------+----------+
|department_id|product_id|sum(value)|
+-------------+----------+----------+
|        Dept1|  product2|         3|
|        Dept1|  product1|       107|
|        Dept2|  product3|         5|
+-------------+----------+----------+

我想要这个输出

+-------------+----------+----------+----------------------------+
|department_id|product_id|sum(value)| collect_list(category_name)|
+-------------+----------+----------+----------------------------+
|        Dept1|  product2|         3|  cat2                      |
|        Dept1|  product1|       107|  cat1, cat2                |
|        Dept2|  product3|         5|  cat1                      |
+-------------+----------+----------+----------------------------+

尝试1

df.groupby("department_id", "product_id")\
    .agg({'value': 'sum'}) \
    .agg(F.collect_set("category_name")) \
    .show()

我收到此错误:

pyspark.sql.utils.AnalysisException:“无法解析' category_name'给定的输入列:[department_id,product_id,sum(value)] ;; \ n'Aggregate [collect_set('category_name,0,0)AS collect_set(category_name)#35 ] \ n +-汇总[部门ID#2,产品ID#3],[部门ID#2,产品ID#3,总和(值#4L)AS总和(值)#24L] \ n +-逻辑RDD [ID#0,类别名称# 1,department_id#2,product_id#3,value#4L] \ n“

尝试2我将category_name作为groupby的一部分

df.groupby("category_name", "department_id", "product_id")\
    .agg({'value': 'sum'}) \
    .agg(F.collect_set("category_name")) \
    .show()

它有效,但输出不正确

+--------------------------+
|collect_set(category_name)|
+--------------------------+
|              [cat1, cat2]|
+--------------------------+
保利

您可以在内指定多个聚合agg()适用于您的情况的正确语法为:

df.groupby("department_id", "product_id")\
    .agg(F.sum('value'), F.collect_set("category_name"))\
    .show()
#+-------------+----------+----------+--------------------------+
#|department_id|product_id|sum(value)|collect_set(category_name)|
#+-------------+----------+----------+--------------------------+
#|        Dept1|  product2|         3|                    [cat2]|
#|        Dept1|  product1|       107|              [cat1, cat2]|
#|        Dept2|  product3|         5|                    [cat1]|
#+-------------+----------+----------+--------------------------+

您的方法无效,因为第一个方法.agg()适用于pyspark.sql.group.GroupedData并返回一个新的DataFrame。随后的呼叫agg实际上pyspark.sql.DataFrame.agg

简写 df.groupBy.agg()

因此,实质上,第二个呼叫agg是再次分组,这不是您想要的。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章