Pythonを使用してSparkで基本的な結合をどのように実行しますか?Rでは、これを行うためにmerg()を使用できます。スパークでPythonを使用する構文は何ですか?
2つのテーブル(RDD)があり、それぞれに共通のキーを持つ単一の列があります。
RDD(1):(key,U)
RDD(2):(key,V)
内部結合は次のようなものだと思います。
rdd1.join(rdd2).map(case (key, u, v) => (key, ls ++ rs));
そうですか?インターネットを検索しましたが、結合の良い例が見つかりません。前もって感謝します。
これは、PairRDDFunctions
またはSparkデータフレームを使用して実行できます。データフレームの操作にはCatalystオプティマイザのメリットがあるため、2番目のオプションは検討に値します。
データが次のようになっているとします。
rdd1 = sc.parallelize([("foo", 1), ("bar", 2), ("baz", 3)])
rdd2 = sc.parallelize([("foo", 4), ("bar", 5), ("bar", 6)])
内部結合:
rdd1.join(rdd2)
左外部結合:
rdd1.leftOuterJoin(rdd2)
デカルト積(必須ではありませんRDD[(T, U)]
):
rdd1.cartesian(rdd2)
ブロードキャスト参加(必須ではありませんRDD[(T, U)]
):
最後に、cogroup
SQLに直接対応するものはありませんが、状況によっては役立つ場合があります。
cogrouped = rdd1.cogroup(rdd2)
cogrouped.mapValues(lambda x: (list(x[0]), list(x[1]))).collect()
## [('foo', ([1], [4])), ('bar', ([2], [5, 6])), ('baz', ([3], []))]
SQL DSLを使用するか、を使用して生のSQLを実行できますsqlContext.sql
。
df1 = spark.createDataFrame(rdd1, ('k', 'v1'))
df2 = spark.createDataFrame(rdd2, ('k', 'v2'))
# Register temporary tables to be able to use `sparkSession.sql`
df1.createOrReplaceTempView('df1')
df2.createOrReplaceTempView('df2')
内部結合:
# inner is a default value so it could be omitted
df1.join(df2, df1.k == df2.k, how='inner')
spark.sql('SELECT * FROM df1 JOIN df2 ON df1.k = df2.k')
左外部結合:
df1.join(df2, df1.k == df2.k, how='left_outer')
spark.sql('SELECT * FROM df1 LEFT OUTER JOIN df2 ON df1.k = df2.k')
クロスジョイン(Spark。2.0ではSpark.2.0 - spark.sql.crossJoin.enabledで明示的なクロスジョインまたは構成の変更が必要です):
df1.crossJoin(df2)
spark.sql('SELECT * FROM df1 CROSS JOIN df2')
df1.join(df2)
sqlContext.sql('SELECT * FROM df JOIN df2')
1.6(Scalaでは1.5)以降、これらはそれぞれbroadcast
関数と組み合わせることができます。
from pyspark.sql.functions import broadcast
df1.join(broadcast(df2), df1.k == df2.k)
ブロードキャスト参加を実行します。BroadcastHashJoinがSparkのShuffledHashJoinより遅い理由も参照してください。
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加