で、この前の質問、私がしようとしていたメモリの問題を回避スパークでのjoin
使用を避けることでjoin
。
この新しい質問では、を使用join
していますが、メモリの問題を修正しようとしています。
これらは私の2つのRDDです:
productToCustomerRDD:
サイズ:非常に大規模では、個別のキーを数百万人の持っている可能性
を持つキーでパーティション化HashPartitioner
される一部のキーの高い重複をし、いくつかはしません。
(toast, John)
(butter, John)
(toast, Jane)
(jelly, Jane)
productToCountRDD:
サイズ:非常に大きく、数百万の個別のキーが含まれる可能性があり、大きすぎbroadcast
てキーを使用してHashPartitioner
キーを分割することは一意です。値は、製品を購入した顧客の数です。
(toast, 2)
(butter, 1)
(jelly, 1)
これら2つのRDDに参加したいのですが、結果は次のようになります。
customerToProductAndCountRDD:
(toast, (John, 2))
(butter, (John, 1))
(toast, (Jane, 2))
(jelly, (Jane, 1))
私は2 RDDSに参加する場合はproductToCustomerRDD.join(productToCountRDD)
、私が得るOutOfMemoryError
2つのパーティション(数千人のうち)に。Spark UIjoin
で、Input Size / Records
列に、を含む段階で、すべてのパーティションに4Kから700Kまでのレコード数があることに気付きました。OOMを生成した2つのパーティションを除くすべて。1つは900万レコード、もう1つは600万レコードです。
私が理解しているように、参加するには、同じキーを持つペアをシャッフルして同じパーティションに移動する必要があります(以前にキーでパーティション化されていた場合を除く)。ただし、一部のキーは非常に頻繁に使用されるため(たとえば、データセット内のほぼすべての顧客が購入した製品)、結合join
中またはrepartition
結合直前のいずれかに、大量のデータが1つのパーティションに移動される場合があります。
私はこれを正しく理解していますか?
これを回避する方法はありますか?同じパーティションに1つの高度に複製されたキーのすべてのデータ
がjoin
ないようにする方法はありますか?
実際、これは「スキュー結合」と呼ばれるSparkの標準的な問題です。結合の片側が歪んでいるため、一部のキーが他のキーよりもはるかに頻繁に使用されます。私にとってうまくいかなかったいくつかの答えはここで見つけることができます。
私が使用した戦略は、ここでGraphFrame.skewedJoin()
定義された方法とここでの使用に触発されていConnectedComponents.skewedJoin()
ます。結合は、ブロードキャスト結合を使用して最も頻度の高いキーを結合し、標準の結合を使用して頻度の低いキーを結合することによって実行されます。
私の例(OP)では、productToCountRDD
すでにキー周波数に関する情報が含まれています。したがって、次のようになります。
productToCountRDD
固定しきい値を超えるカウントとcollectAsMap()
ドライバーのみを保持するようにフィルター処理します。productToCustomerRDD
2つのRDDに分割します。ブロードキャストマップにあるキー(頻繁なキー)とそうでないキー(まれなキー)です。mapToPair
取得してで実行count
されますjoin
ます。union
最後に使用して、完全なRDDを取得します。この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加