私は2つのPySparkのデータフレームを持っていると言うdf1
とdf2
。
df1= 'a'
1
2
5
df2= 'b'
3
6
そしてdf2['b']
、それぞれdf1['a']
に最も近い値を見つけて、の新しい列として最も近い値を追加したいと思いますdf1
。
言い換えれば、各値のためx
にdf1['a']
私が見つけたい、y
その達成せmin(abx(x-y))
、すべてのためのy in df2['b']
(1つだけが存在することを想定することができるノートがy
最小距離を達成することができます)、その結果は次のようになり
'a' 'b'
1 3
2 3
5 6
次のコードを試して、最初に距離行列を作成しました(最小距離を達成する値を見つける前に)。
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
def dict(x,y):
return abs(x-y)
udf_dict = udf(dict, IntegerType())
sql_sc = SQLContext(sc)
udf_dict(df1.a, df2.b)
これは
Column<PythonUDF#dist(a,b)>
そして、私が試しました
sql_sc.CreateDataFrame(udf_dict(df1.a, df2.b))
エラー/出力を与えることなく永久に実行されます。
私の質問は以下のとおりです。
a
とb
値の距離行列を作成してから、min
1つを見つけることです)あなたの2番目の質問で開始 - あなただけの既存のデータフレームにUDFを適用することができ、私はあなたがこのような何かを考えていたと思います:
>>> df1.join(df2).withColumn('distance', udf_dict(df1.a, df2.b)).show()
+---+---+--------+
| a| b|distance|
+---+---+--------+
| 1| 3| 2|
| 1| 6| 5|
| 2| 3| 1|
| 2| 6| 4|
| 5| 3| 2|
| 5| 6| 1|
+---+---+--------+
しかし、内部使用することにより、この距離を適用するためのより効率的な方法がありますabs
:
>>> from pyspark.sql.functions import abs
>>> df1.join(df2).withColumn('distance', abs(df1.a -df2.b))
次に、以下を計算して一致する番号を見つけることができます。
>>> distances = df1.join(df2).withColumn('distance', abs(df1.a -df2.b))
>>> min_distances = distances.groupBy('a').agg(min('distance').alias('distance'))
>>> distances.join(min_distances, ['a', 'distance']).select('a', 'b').show()
+---+---+
| a| b|
+---+---+
| 5| 6|
| 1| 3|
| 2| 3|
+---+---+
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加