合并Spark中的等分区数据帧

尤吉斯

在Hadoop中,只需使用带有CompositeInputFormat的地图端连接,就可以完成大型等分区数据集的连接/合并,而无需进行重新组合和减少阶段。

试图找出在Spark中执行此操作的方法:

val x = sc.parallelize(Seq(("D", 1), ("C", 2), ("B", 3), ("A", 4))).toDF("k", "v")
    .repartition(col("k")).cache()
val y = sc.parallelize(Seq(("F", 5), ("E", 6), ("D", 7), ("C", 8))).toDF("k", "v")
    .repartition(col("k")).cache()

val xy = x.join(y, x.col("k") === y.col("k"), "outer")

x.show()    y.show()    xy.show()

+---+---+   +---+---+   +----+----+----+----+
|  k|  v|   |  k|  v|   |   k|   v|   k|   v|
+---+---+   +---+---+   +----+----+----+----+
|  A|  6|   |  C| 12|   |   A|   4|null|null|
|  B|  5|   |  D| 11|   |   B|   3|null|null|
|  C|  4|   |  E| 10|   |   C|   2|   C|   8|
|  D|  3|   |  F|  9|   |   D|   1|   D|   7|
|  E|  2|   |  G|  8|   |null|null|   E|   6|
|  F|  1|   |  H|  7|   |null|null|   F|   5|
+---+---+   +---+---+   +----+----+----+----+

到目前为止,一切都很好。但是,当我检查执行计划时,会看到“不必要的”排序:

xy.explain

== Physical Plan ==
SortMergeOuterJoin [k#1283], [k#1297], FullOuter, None
:- Sort [k#1283 ASC], false, 0
:  +- InMemoryColumnarTableScan [k#1283,v#1284], InMemoryRelation [k#1283,v#1284], true, 10000, StorageLevel(true, true, false, true, 1), TungstenExchange hashpartitioning(k#1283,200), None, None
+- Sort [k#1297 ASC], false, 0
   +- InMemoryColumnarTableScan [k#1297,v#1298], InMemoryRelation [k#1297,v#1298], true, 10000, StorageLevel(true, true, false, true, 1), TungstenExchange hashpartitioning(k#1297,200), None, None

是否可以在这里避免排序?

编辑

作为参考,Hadoop自2007年以来具有此“功能”:https : //issues.apache.org/jira/browse/HADOOP-2085

更新

正如Lezzar指出的那样,仅repartition()不足以实现均分的排序状态。我认为现在需要在sortWithinPartitions()之后,这样就可以解决问题:

val x = sc.parallelize(Seq(("F", 1), ("E", 2), ("D", 3), ("C", 4), ("B", 5), ("A", 6))).toDF("k", "v")
    .repartition(col("k")).sortWithinPartitions(col("k")).cache()
val y = sc.parallelize(Seq(("H", 7), ("G", 8), ("F", 9), ("E",10), ("D",11), ("C",12))).toDF("k", "v")
    .repartition(col("k")).sortWithinPartitions(col("k")).cache()

xy.explain()

== Physical Plan ==
SortMergeOuterJoin [k#1055], [k#1069], FullOuter, None
:- InMemoryColumnarTableScan [k#1055,v#1056], InMemoryRelation [k#1055,v#1056], true, 10000, StorageLevel(true, true, false, true, 1), Sort [k#1055 ASC], false, 0, None
+- InMemoryColumnarTableScan [k#1069,v#1070], InMemoryRelation [k#1069,v#1070], true, 10000, StorageLevel(true, true, false, true, 1), Sort [k#1069 ASC], false, 0, None

不再排序!

勒扎尔·瓦利德(Lezzar Walid)

为什么要说不必要的排序?合并联接需要对数据进行排序。在IMHO中,没有比合并联接更好的策略来执行完整的外部联接,除非您的一个数据帧足够小以进行广播

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

重新分区更改Spark中数据帧的行顺序

来自分类Dev

如何确定Apache Spark数据帧中的分区大小

来自分类Dev

使用Scala和Python API合并Spark数据帧时使用不同的分区号

来自分类Dev

如何在Spark数据帧中合并数组列

来自分类Dev

无法在 Scala Spark 中合并两个数据帧

来自分类Dev

基于Scala中的关键列合并Spark数据帧行

来自分类Dev

在R中合并数据帧

来自分类Dev

spark scala 数据帧合并多个数据帧

来自分类Dev

基于scala spark中的键合并数据帧中的多条记录

来自分类Dev

如何在Python中将Spark数据帧中的所有列值合并为String?

来自分类Dev

Spark 合并两个数据帧并通过覆盖第二个数据帧中的值来创建单个数据帧

来自分类Dev

从数据帧列表中合并for循环中的数据帧

来自分类Dev

合并存储在列表中的数据帧

来自分类Dev

R中的合并和交织数据帧

来自分类Dev

在R中按计数合并数据帧

来自分类Dev

在python中的for循环中合并数据帧

来自分类Dev

在 R 中合并数据帧的列

来自分类Dev

Spark分区中的数据何时真正实现?

来自分类Dev

在Apache Spark中控制数据分区

来自分类Dev

合并分区并保留数据

来自分类Dev

在Apache Spark中拆分数据帧

来自分类Dev

在spark中为minHashLSH转换数据帧

来自分类Dev

如何在Spark中释放数据帧?

来自分类Dev

spark scala中的数据帧映射?

来自分类Dev

如何在不使用合并的情况下在本地系统的单个文件中写入spark数据帧

来自分类Dev

如何删除数据帧中的空分区?

来自分类Dev

与dplyr合并数据帧

来自分类Dev

合并数据帧的问题

来自分类Dev

Scala:数据帧合并