我有以下数据框:
+---------------+--------------+--------------+-----+
| column0| column1| column2|label|
+---------------+--------------+--------------+-----+
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001| 2|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880| 2|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001| 2|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001| 2|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880| 2|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001| 2|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001| 2|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880| 2|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001| 2|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001| 2|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880| 2|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001| 2|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001| 2|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880| 2|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001| 2|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001| 2|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880| 2|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001| 2|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001| 2|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880| 2|
+---------------+--------------+--------------+-----+
我想应用 groupBy 并依靠它并得出以下结果:
+--------------+--------------+-----+
| column1| column2|count|
+--------------+--------------+-----+
|10.0.0.2.54880| 10.0.0.3.5001| 19|
| 10.0.0.3.5001|10.0.0.2.54880| 10|
+--------------+--------------+-----+
我知道我必须使用这个:
dataFrame_Train.groupBy("column1", "column2").count().show()
但是问题是我需要将“计数”列作为永久列添加到我的数据框中。在上述情况下,如果我dataFrame_Train.show()
在 之后使用groupBy
,我会看到没有“计数”列的第一个数据帧。这段代码:
dataFrame_Train.groupBy("column1", "column2").count().show()
`dataFrame_Train.show()`
你能帮我添加groupBy("column1", "column2").count()
到数据框吗?(因为我将来需要使用“计数”列来训练数据)谢谢。
@eliasah 的回答很好,但可能不是最有效的、代码和性能方面的。
每当您看到需要groupBy
and 时join
,尤其是。对于像这样的简单用例,请考虑窗口聚合函数。
之间的主要区别groupBy
和窗口聚集之处在于前者给你最多的行数在源数据集,而后者(窗口集合体)为您提供准确的行数与源数据集。这似乎完全符合您的要求,不是吗?
有了这个,让我们看看代码。
import org.apache.spark.sql.expressions.Window
val columns1and2 = Window.partitionBy("column1", "column2") // <-- matches groupBy
import org.apache.spark.sql.functions._
// using count aggregate function over entire partition frame
val counts = ips.withColumn("count", count($"label") over columns1and2)
scala> counts.show
+---------------+--------------+--------------+-----+-----+
| column0| column1| column2|label|count|
+---------------+--------------+--------------+-----+-----+
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001| 2| 13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001| 2| 13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001| 2| 13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001| 2| 13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001| 2| 13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001| 2| 13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001| 2| 13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001| 2| 13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001| 2| 13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001| 2| 13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001| 2| 13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001| 2| 13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001| 2| 13|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880| 2| 7|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880| 2| 7|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880| 2| 7|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880| 2| 7|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880| 2| 7|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880| 2| 7|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880| 2| 7|
+---------------+--------------+--------------+-----+-----+
完毕!干净又方便。这就是我心爱的窗口聚合函数!
有趣的来了。这和@eliasah 的解决方案之间的区别只是纯粹的语法吗?我不这么认为(但我仍在学习如何得出正确的结论)。查看执行计划并自行判断。
下面是窗口聚合的执行计划。
然而,以下是执行计划groupBy
和join
(我不得不把两个截图的计划是太大的一个包括)。
Job-wisegroupBy
和join
query 轻松击败了窗口聚合,前者有 2 个 Spark 作业,后者有 5 个。
操作员方面,它们的数量和最重要的交换(它们是 Spark SQL 的 shuffle),窗口聚合可能已经击败groupBy
了join
.
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句