我正在尝试使用collect_set获取不属于groupby的categorie_names字符串列表。我的代码是
from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql import functions as F
sc = SparkContext("local")
sqlContext = HiveContext(sc)
df = sqlContext.createDataFrame([
("1", "cat1", "Dept1", "product1", 7),
("2", "cat2", "Dept1", "product1", 100),
("3", "cat2", "Dept1", "product2", 3),
("4", "cat1", "Dept2", "product3", 5),
], ["id", "category_name", "department_id", "product_id", "value"])
df.show()
df.groupby("department_id", "product_id")\
.agg({'value': 'sum'}) \
.show()
# .agg( F.collect_set("category_name"))\
输出是
+---+-------------+-------------+----------+-----+
| id|category_name|department_id|product_id|value|
+---+-------------+-------------+----------+-----+
| 1| cat1| Dept1| product1| 7|
| 2| cat2| Dept1| product1| 100|
| 3| cat2| Dept1| product2| 3|
| 4| cat1| Dept2| product3| 5|
+---+-------------+-------------+----------+-----+
+-------------+----------+----------+
|department_id|product_id|sum(value)|
+-------------+----------+----------+
| Dept1| product2| 3|
| Dept1| product1| 107|
| Dept2| product3| 5|
+-------------+----------+----------+
我想要这个输出
+-------------+----------+----------+----------------------------+
|department_id|product_id|sum(value)| collect_list(category_name)|
+-------------+----------+----------+----------------------------+
| Dept1| product2| 3| cat2 |
| Dept1| product1| 107| cat1, cat2 |
| Dept2| product3| 5| cat1 |
+-------------+----------+----------+----------------------------+
尝试1
df.groupby("department_id", "product_id")\
.agg({'value': 'sum'}) \
.agg(F.collect_set("category_name")) \
.show()
我收到此错误:
pyspark.sql.utils.AnalysisException:“无法解析'
category_name
'给定的输入列:[department_id,product_id,sum(value)] ;; \ n'Aggregate [collect_set('category_name,0,0)AS collect_set(category_name)#35 ] \ n +-汇总[部门ID#2,产品ID#3],[部门ID#2,产品ID#3,总和(值#4L)AS总和(值)#24L] \ n +-逻辑RDD [ID#0,类别名称# 1,department_id#2,product_id#3,value#4L] \ n“
尝试2我将category_name作为groupby的一部分
df.groupby("category_name", "department_id", "product_id")\
.agg({'value': 'sum'}) \
.agg(F.collect_set("category_name")) \
.show()
它有效,但输出不正确
+--------------------------+
|collect_set(category_name)|
+--------------------------+
| [cat1, cat2]|
+--------------------------+
您可以在内指定多个聚合agg()
。适用于您的情况的正确语法为:
df.groupby("department_id", "product_id")\
.agg(F.sum('value'), F.collect_set("category_name"))\
.show()
#+-------------+----------+----------+--------------------------+
#|department_id|product_id|sum(value)|collect_set(category_name)|
#+-------------+----------+----------+--------------------------+
#| Dept1| product2| 3| [cat2]|
#| Dept1| product1| 107| [cat1, cat2]|
#| Dept2| product3| 5| [cat1]|
#+-------------+----------+----------+--------------------------+
您的方法无效,因为第一个方法.agg()
适用于pyspark.sql.group.GroupedData
并返回一个新的DataFrame。随后的呼叫agg
实际上pyspark.sql.DataFrame.agg
是
简写
df.groupBy.agg()
因此,实质上,第二个呼叫agg
是再次分组,这不是您想要的。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句