Pyspark:udfを使用して、別のデータフレームの値に基づいてデータフレームに新しい列を追加します

アキラ

2つのpysparkデータフレームがありdf_2、dataframe_1の値に基づいてdataframe_2()に新しい列を追加しようとしています。Dataframe_2列でval_1ありval_2、dataframe_1の行と列の位置である必要があります。

dataframe_1

df_1 = sqlContext.createDataFrame([(0.78, 0.79, 0.45, 0.67, 0.88), (0.77, 0.79, 0.81, 0.82, 0.66), (0.99, 0.92, 0.94, 0.95, 0.91), (
    0.75, 0.53, 0.83, 0.73, 0.56), (0.77, 0.78, 0.99, 0.34, 0.67)], ["col_1", "col_2", "col_3", "col_4", "col_5"])

df_1.show()
+-----+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|col_5|
+-----+-----+-----+-----+-----+
| 0.78| 0.79| 0.45| 0.67| 0.88|
| 0.77| 0.79| 0.81| 0.82| 0.66|
| 0.99| 0.92| 0.94| 0.95| 0.91|
| 0.75| 0.53| 0.83| 0.73| 0.56|
| 0.77| 0.78| 0.99| 0.34| 0.67|
+-----+-----+-----+-----+-----+

dataframe_2

df_2 = sqlContext.createDataFrame([(34563, 435353424, 1, 2 ), (23524, 466344656, 2, 1), (52452, 263637236, 2, 5), (
   52334, 466633353, 2, 3), (66334, 563555578, 5, 4), (42552, 123445563, 5, 3), (72331, 413555213, 4, 3), (82311, 52355563, 2, 2)], ["id", "col_A", "val_1", "val_2"])
df_2.show()
+-----+---------+-----+-----+
|   id|    col_A|val_1|val_2|
+-----+---------+-----+-----+
|34563|435353424|    1|    2|
|23524|466344656|    2|    1|
|52452|263637236|    2|    5|
|52334|466633353|    2|    3|
|66334|563555578|    5|    4|
|42552|123445563|    5|    3|
|72331|413555213|    4|    3|
|82311| 52355563|    2|    2|
+-----+---------+-----+-----+

目標df_2の値に基づいて新しい列を追加するdf_1

udfの作成を使用しようとしましたが、エラーが発生しました。

期待される出力:

+-----+---------+-----+-----+---------------+
|   id|    col_A|val_1|val_2|value_from_df_1|
+-----+---------+-----+-----+---------------+
|34563|435353424|    1|    2|           0.79|
|23524|466344656|    2|    1|           0.77|
|52452|263637236|    2|    5|           0.66|
|52334|466633353|    2|    3|           0.94|
|66334|563555578|    5|    4|           0.34|
|42552|123445563|    5|    3|           0.99|
|72331|413555213|    4|    3|           0.83|
|82311| 52355563|    2|    2|           0.79|
+-----+---------+-----+-----+---------------+

私のコード:

from pyspark.sql import functions as F
import pyspark.sql.types as t

def add_data_to_table(table, value_1, value_2):
    return float(table.collect()[value_1-1][value_2-1])

select_data_from_table = F.udf(add_data_to_table, t.FloatType())
result_df = df_2.withColumn('value_from_df_1', select_data_from_table(df_1, df_2.val_1, df_2.val_2 ))
result_df.show()

誰かが助けてくれたら本当に感謝しています。ありがとうございました。

mck

パンダとは異なり、Sparkにはインデックスの概念がないため、手動でインデックスを追加する必要があります。UDFはデータフレーム全体ではなく、行ごとに動作するため、ここではUDFは適切ではありません。

from pyspark.sql import functions as F, Window

df_1_id = df_1.withColumn(
    'row',
    F.row_number().over(Window.orderBy(F.monotonically_increasing_id()))
).select(
    'row',
    F.posexplode(F.array(*df_1.columns))
)

result = df_2.withColumn(
    'rowid',
    F.monotonically_increasing_id()
).join(
    df_1_id,
    (df_1_id.row == df_2.val_1) & (df_1_id.pos + 1 == df_2.val_2)
).orderBy('rowid').drop('rowid', 'row', 'pos')

result.show()
+-----+---------+-----+-----+----+
|   id|    col_A|val_1|val_2| col|
+-----+---------+-----+-----+----+
|34563|435353424|    1|    2|0.79|
|23524|466344656|    2|    1|0.77|
|52452|263637236|    2|    5|0.66|
|52334|466633353|    2|    3|0.81|
|66334|563555578|    5|    4|0.34|
|42552|123445563|    5|    3|0.99|
|72331|413555213|    4|    3|0.83|
|82311| 52355563|    2|    2|0.79|
+-----+---------+-----+-----+----+

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

パンダ:別のデータフレームの値に基づいて、データフレームに新しい列を追加します

分類Dev

行の最初の値に基づいて、データフレームに新しい列を追加します

分類Dev

Pysparkデータフレーム:別の列の値に基づいて列を抽出します

分類Dev

パンダのデータフレームの別の列の値に基づいて列を追加します

分類Dev

条件に基づいて別のデータフレームの値からデータフレームに新しい列を追加する

分類Dev

他の列に基づいてpysparkデータフレームに新しい列を追加する

分類Dev

別のデータフレームの値に基づいてデータフレームを更新します

分類Dev

他の列の結果に基づいて、データフレームに新しい列を追加します

分類Dev

値に基づいてpysparkデータフレームに新しい行を追加します

分類Dev

既存の列の値に基づいてpandasデータフレームに新しい列を追加する

分類Dev

他の列の値に基づいてデータフレームに新しい列を追加する

分類Dev

複数の列の値に基づいてデータフレームに新しい列を追加する

分類Dev

Pyspark:1つの列の値に基づいて、あるデータフレームを別のデータフレームから減算します

分類Dev

dplyrを使用して、他のデータフレームに基づいて新しい列を追加する

分類Dev

別のデータフレームに基づいて新しいデータフレームを作成する

分類Dev

Pyspark-異なるデータフレームの値に基づいてデータフレームに列を追加します

分類Dev

Rの条件に基づいて、データフレームに複数の新しい列を追加します

分類Dev

列の複数の値に基づいてデータフレームに新しい行を作成します

分類Dev

別のデータフレームの日付条件に基づいて新しい列を作成します

分類Dev

複数の列の値に基づいて新しいデータフレーム列を作成します

分類Dev

条件に基づいて、データフレーム列の値を別の列の値に変更します

分類Dev

R-別のデータフレームの一致する値を使用して、データフレームに新しい列を追加します

分類Dev

groupby 値に基づいて pandas データフレームに新しい列を追加します

分類Dev

Rの他のデータフレームに値が存在するかどうかに基づいて、新しい列に新しい値を追加します

分類Dev

リストとデータフレームに基づいて複数の条件を持つデータフレームに新しい列を追加します

分類Dev

Pythonは、別の列の条件に基づいてデータフレームに行を追加します

分類Dev

既存の列のカテゴリ値に基づいてデータフレームに列を追加します

分類Dev

他の列の値に基づいてデータフレームに列を追加します

分類Dev

他の列の値に基づいてデータフレームに列を追加します

Related 関連記事

  1. 1

    パンダ:別のデータフレームの値に基づいて、データフレームに新しい列を追加します

  2. 2

    行の最初の値に基づいて、データフレームに新しい列を追加します

  3. 3

    Pysparkデータフレーム:別の列の値に基づいて列を抽出します

  4. 4

    パンダのデータフレームの別の列の値に基づいて列を追加します

  5. 5

    条件に基づいて別のデータフレームの値からデータフレームに新しい列を追加する

  6. 6

    他の列に基づいてpysparkデータフレームに新しい列を追加する

  7. 7

    別のデータフレームの値に基づいてデータフレームを更新します

  8. 8

    他の列の結果に基づいて、データフレームに新しい列を追加します

  9. 9

    値に基づいてpysparkデータフレームに新しい行を追加します

  10. 10

    既存の列の値に基づいてpandasデータフレームに新しい列を追加する

  11. 11

    他の列の値に基づいてデータフレームに新しい列を追加する

  12. 12

    複数の列の値に基づいてデータフレームに新しい列を追加する

  13. 13

    Pyspark:1つの列の値に基づいて、あるデータフレームを別のデータフレームから減算します

  14. 14

    dplyrを使用して、他のデータフレームに基づいて新しい列を追加する

  15. 15

    別のデータフレームに基づいて新しいデータフレームを作成する

  16. 16

    Pyspark-異なるデータフレームの値に基づいてデータフレームに列を追加します

  17. 17

    Rの条件に基づいて、データフレームに複数の新しい列を追加します

  18. 18

    列の複数の値に基づいてデータフレームに新しい行を作成します

  19. 19

    別のデータフレームの日付条件に基づいて新しい列を作成します

  20. 20

    複数の列の値に基づいて新しいデータフレーム列を作成します

  21. 21

    条件に基づいて、データフレーム列の値を別の列の値に変更します

  22. 22

    R-別のデータフレームの一致する値を使用して、データフレームに新しい列を追加します

  23. 23

    groupby 値に基づいて pandas データフレームに新しい列を追加します

  24. 24

    Rの他のデータフレームに値が存在するかどうかに基づいて、新しい列に新しい値を追加します

  25. 25

    リストとデータフレームに基づいて複数の条件を持つデータフレームに新しい列を追加します

  26. 26

    Pythonは、別の列の条件に基づいてデータフレームに行を追加します

  27. 27

    既存の列のカテゴリ値に基づいてデータフレームに列を追加します

  28. 28

    他の列の値に基づいてデータフレームに列を追加します

  29. 29

    他の列の値に基づいてデータフレームに列を追加します

ホットタグ

アーカイブ