在Hadoop中,只需使用带有CompositeInputFormat的地图端连接,就可以完成大型等分区数据集的连接/合并,而无需进行重新组合和减少阶段。
试图找出在Spark中执行此操作的方法:
val x = sc.parallelize(Seq(("D", 1), ("C", 2), ("B", 3), ("A", 4))).toDF("k", "v")
.repartition(col("k")).cache()
val y = sc.parallelize(Seq(("F", 5), ("E", 6), ("D", 7), ("C", 8))).toDF("k", "v")
.repartition(col("k")).cache()
val xy = x.join(y, x.col("k") === y.col("k"), "outer")
x.show() y.show() xy.show()
+---+---+ +---+---+ +----+----+----+----+
| k| v| | k| v| | k| v| k| v|
+---+---+ +---+---+ +----+----+----+----+
| A| 6| | C| 12| | A| 4|null|null|
| B| 5| | D| 11| | B| 3|null|null|
| C| 4| | E| 10| | C| 2| C| 8|
| D| 3| | F| 9| | D| 1| D| 7|
| E| 2| | G| 8| |null|null| E| 6|
| F| 1| | H| 7| |null|null| F| 5|
+---+---+ +---+---+ +----+----+----+----+
到目前为止,一切都很好。但是,当我检查执行计划时,会看到“不必要的”排序:
xy.explain
== Physical Plan ==
SortMergeOuterJoin [k#1283], [k#1297], FullOuter, None
:- Sort [k#1283 ASC], false, 0
: +- InMemoryColumnarTableScan [k#1283,v#1284], InMemoryRelation [k#1283,v#1284], true, 10000, StorageLevel(true, true, false, true, 1), TungstenExchange hashpartitioning(k#1283,200), None, None
+- Sort [k#1297 ASC], false, 0
+- InMemoryColumnarTableScan [k#1297,v#1298], InMemoryRelation [k#1297,v#1298], true, 10000, StorageLevel(true, true, false, true, 1), TungstenExchange hashpartitioning(k#1297,200), None, None
是否可以在这里避免排序?
编辑
作为参考,Hadoop自2007年以来具有此“功能”:https : //issues.apache.org/jira/browse/HADOOP-2085
更新
正如Lezzar指出的那样,仅repartition()不足以实现均分的排序状态。我认为现在需要在sortWithinPartitions()之后,这样就可以解决问题:
val x = sc.parallelize(Seq(("F", 1), ("E", 2), ("D", 3), ("C", 4), ("B", 5), ("A", 6))).toDF("k", "v")
.repartition(col("k")).sortWithinPartitions(col("k")).cache()
val y = sc.parallelize(Seq(("H", 7), ("G", 8), ("F", 9), ("E",10), ("D",11), ("C",12))).toDF("k", "v")
.repartition(col("k")).sortWithinPartitions(col("k")).cache()
xy.explain()
== Physical Plan ==
SortMergeOuterJoin [k#1055], [k#1069], FullOuter, None
:- InMemoryColumnarTableScan [k#1055,v#1056], InMemoryRelation [k#1055,v#1056], true, 10000, StorageLevel(true, true, false, true, 1), Sort [k#1055 ASC], false, 0, None
+- InMemoryColumnarTableScan [k#1069,v#1070], InMemoryRelation [k#1069,v#1070], true, 10000, StorageLevel(true, true, false, true, 1), Sort [k#1069 ASC], false, 0, None
不再排序!
为什么要说不必要的排序?合并联接需要对数据进行排序。在IMHO中,没有比合并联接更好的策略来执行完整的外部联接,除非您的一个数据帧足够小以进行广播
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句