データフレーム「df1」を3列に分割したいと思います。このデータフレームには、これら3つの列に対して正確に990の固有の組み合わせがあります。
In [17]: df1.createOrReplaceTempView("df1_view")
In [18]: spark.sql("select count(*) from (select distinct(col1,col2,col3) from df1_view) as t").show()
+--------+
|count(1)|
+--------+
| 990|
+--------+
このデータフレームの処理を最適化するために、主要な可能性ごとに1つずつ、990個のパーティションを取得するためにdf1をパーティション化します。
In [19]: df1.rdd.getNumPartitions()
Out[19]: 24
In [20]: df2 = df1.repartition(990, "col1", "col2", "col3")
In [21]: df2.rdd.getNumPartitions()
Out[21]: 990
各パーティションの行を数える簡単な方法を書きました。
In [22]: def f(iterator):
...: a = 0
...: for partition in iterator:
...: a = a + 1
...: print(a)
...:
In [23]: df2.foreachPartition(f)
実際に取得するのは、1つ以上のキー値を持つ628個のパーティションと、362個の空のパーティションであることに気付きました。
Sparkは均等な方法(1つのキー値= 1つのパーティション)で再パーティション化されると想定しましたが、それはそうではないようです。この再パーティション化は、逆のはずなのにデータスキューを追加しているように感じます...
Sparkが列のデータフレームを分割するために使用するアルゴリズムは何ですか?私が可能だと思ったことを達成する方法はありますか?
ClouderaでSpark2.2.0を使用しています。
パーティション間でデータを分散するには、sparkは列の値をパーティションのインデックスに変換する必要があります。Sparkには、HashPartitionerとRangePartitionerの2つのデフォルトパーティショナーがあります。Sparkでのさまざまな変換により、さまざまなパーティショナーを適用できますjoin
。たとえば、ハッシュパーティショナーを適用します。
基本的に、ハッシュパーティショナー式が値をパーティションインデックスに変換する場合はvalue.hashCode() % numOfPartitions
。あなたの場合、複数の値が同じパーティションインデックスにマッピングされています。
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加