Sparkデータフレームを再パーティション化するときに、なぜこれほど多くの空のパーティションを取得するのですか?

Tomcat

データフレーム「df1」を3列に分割したいと思います。このデータフレームには、これら3つの列に対して正確に990の固有の組み合わせがあります。

In [17]: df1.createOrReplaceTempView("df1_view")

In [18]: spark.sql("select count(*) from (select distinct(col1,col2,col3) from df1_view) as t").show()
+--------+                                                                      
|count(1)|
+--------+
|     990|
+--------+

このデータフレームの処理を最適化するために、主要な可能性ごとに1つずつ、990個のパーティションを取得するためにdf1をパーティション化します。

In [19]: df1.rdd.getNumPartitions()
Out[19]: 24

In [20]: df2 = df1.repartition(990, "col1", "col2", "col3")

In [21]: df2.rdd.getNumPartitions()
Out[21]: 990

各パーティションの行を数える簡単な方法を書きました。

In [22]: def f(iterator):
    ...:     a = 0
    ...:     for partition in iterator:
    ...:         a = a + 1
    ...:     print(a)
    ...: 

In [23]: df2.foreachPartition(f)

実際に取得するのは、1つ以上のキー値を持つ628個のパーティションと、362個の空のパーティションであることに気付きました。

Sparkは均等な方法(1つのキー値= 1つのパーティション)で再パーティション化されると想定しましたが、それはそうではないようです。この再パーティション化は、逆のはずなのにデータスキューを追加しているように感じます...

Sparkが列のデータフレームを分割するために使用するアルゴリズムは何ですか?私が可能だと思ったことを達成する方法はありますか?

ClouderaでSpark2.2.0を使用しています。

Vladislav Varslavans

パーティション間でデータを分散するには、sparkは列の値をパーティションのインデックスに変換する必要があります。Sparkには、HashPartitionerとRangePartitionerの2つのデフォルトパーティショナーがあります。Sparkでのさまざまな変換により、さまざまなパーティショナーを適用できますjoinたとえば、ハッシュパーティショナーを適用します。

基本的に、ハッシュパーティショナー式が値をパーティションインデックスに変換する場合はvalue.hashCode() % numOfPartitionsあなたの場合、複数の値が同じパーティションインデックスにマッピングされています。

より良い配布が必要な場合は、独自のパーティショナーを実装できます。それについての詳細はここここここにあります

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

Related 関連記事

ホットタグ

アーカイブ