値に基づいてpysparkデータフレームに新しい行を追加します

ラドー

私はこのようなデータフレームを持っています:

client_username|workstation|session_duration|access_point_name|start_date|
XX1@AD         |Apple      |1.55            |idf_1            |2019-06-01|
XX2@AD         |Apple      |30.12           |idf_2            |2019-06-04|
XX3@AD         |Apple      |78.25           |idf_3            |2019-06-02|
XX4@AD         |Apple      |0.45            |idf_1            |2019-06-02|
XX1@AD         |Apple      |23.11           |idf_1            |2019-06-02|

client_username - id of user in domain
workstation - user workstation
session_duration - duration (in hours) of the active session (user logged on hist host)
access_point_name - the name of access point that supplies the network to users host
start_date - start session

このようなデータフレームを実現したいと思います。

client_username|workstation|session_duration|access_point_name|start_date|
XX1@AD         |Apple      |1.55            |idf_1            |2019-06-01|
XX2@AD         |Apple      |8               |idf_2            |2019-06-04|
XX2@AD         |Apple      |8               |idf_2            |2019-06-05|
XX3@AD         |Apple      |8               |idf_3            |2019-06-02|
XX3@AD         |Apple      |8               |idf_3            |2019-06-03|
XX3@AD         |Apple      |8               |idf_3            |2019-06-04|
XX3@AD         |Apple      |8               |idf_3            |2019-06-05|
XX4@AD         |Apple      |0.45            |idf_1            |2019-06-02|
XX1@AD         |Apple      |23.11           |idf_1            |2019-06-02|

アイデアは次のとおりです。*セッションの長さが24時間以上、48時間未満の場合は、変更したいと思います。

XX2@AD         |Apple      |30.12           |idf_2            |2019-06-04|

それに:

XX2@AD         |Apple      |8               |idf_2            |2019-06-04|
XX2@AD         |Apple      |8               |idf_2            |2019-06-05|

セッションの期間は8時間に変更されますが、日数は2日に増加します(2019-06-04および2019-06-05)。48時間(3日)、72時間(4日)などを超える期間の分析状況。

私はpysparkを学び始めています。私が使用しようとしたunionか、crossJoinデータフレーム上に、これは非常に現時点では私のために複雑になります。このタスクを使用して実行したいと思いますpyspark

jxc

試すことができるいくつかの方法は次のとおりです。

方法-1:文字列関数:繰り返し部分文字列

  1. 繰り返し回数を計算する n = ceil(session_duration/24)
  2. a部分文字列8,n何度も繰り返す文字列作成し、substring()またはregexp_replace()を使用して末尾のコンマを削除します,
  3. aカンマで分割し、それをposとの行に分解しますsession_duration
  4. pos上記の手順でstart_dateを調整します
  5. 文字列session_durationをにキャストしますdouble

以下のコード例を参照してください。

from pyspark.sql import functions as F

# assume the columns in your dataframe are read with proper data types
# for example using inferSchema=True
df = spark.read.csv('/path/to/file', header=True, inferSchema=True)

df1 = df.withColumn('n', F.ceil(F.col('session_duration')/24).astype('int')) \
        .withColumn('a', F.when(F.col('n')>1, F.expr('substring(repeat("8,",n),0,2*n-1)')).otherwise(F.col('session_duration')))

>>> df1.show()
+---------------+-----------+----------------+-----------------+-------------------+---+-------+
|client_username|workstation|session_duration|access_point_name|         start_date|  n|      a|
+---------------+-----------+----------------+-----------------+-------------------+---+-------+
|         XX1@AD|      Apple|            1.55|            idf_1|2019-06-01 00:00:00|  1|   1.55|
|         XX2@AD|      Apple|           30.12|            idf_2|2019-06-04 00:00:00|  2|    8,8|
|         XX3@AD|      Apple|           78.25|            idf_3|2019-06-02 00:00:00|  4|8,8,8,8|
|         XX4@AD|      Apple|            0.45|            idf_1|2019-06-02 00:00:00|  1|   0.45|
|         XX1@AD|      Apple|           23.11|            idf_1|2019-06-02 00:00:00|  1|  23.11|
+---------------+-----------+----------------+-----------------+-------------------+---+-------+

df_new = df1.select(
          'client_username'
        , 'workstation'
        , F.posexplode(F.split('a', ',')).alias('pos', 'session_duration')
        , 'access_point_name'
        , F.expr('date_add(start_date, pos)').alias('start_date')
    ).drop('pos')

>>> df_new.show()
+---------------+-----------+----------------+-----------------+----------+
|client_username|workstation|session_duration|access_point_name|start_date|
+---------------+-----------+----------------+-----------------+----------+
|         XX1@AD|      Apple|            1.55|            idf_1|2019-06-01|
|         XX2@AD|      Apple|               8|            idf_2|2019-06-04|
|         XX2@AD|      Apple|               8|            idf_2|2019-06-05|
|         XX3@AD|      Apple|               8|            idf_3|2019-06-02|
|         XX3@AD|      Apple|               8|            idf_3|2019-06-03|
|         XX3@AD|      Apple|               8|            idf_3|2019-06-04|
|         XX3@AD|      Apple|               8|            idf_3|2019-06-05|
|         XX4@AD|      Apple|            0.45|            idf_1|2019-06-02|
|         XX1@AD|      Apple|           23.11|            idf_1|2019-06-02|
+---------------+-----------+----------------+-----------------+----------+

上記のコードは、1つのチェーンに書き込むこともできます。

df_new = df.withColumn('n'
                , F.ceil(F.col('session_duration')/24).astype('int')
          ).withColumn('a'
                , F.when(F.col('n')>1, F.expr('substring(repeat("8,",n),0,2*n-1)')).otherwise(F.col('session_duration'))
          ).select('client_username'
                , 'workstation'
                , F.posexplode(F.split('a', ',')).alias('pos', 'session_duration')
                , 'access_point_name'
                , F.expr('date_add(start_date, pos)').alias('start_date')
          ).withColumn('session_duration'
                , F.col('session_duration').astype('double')
          ).drop('pos')

方法2:配列関数array_repeat(pyspark 2.4+)

Method-1と同様ですが、aすでに配列であるため、文字列を配列に分割する必要はありません。

df1 = df.withColumn('n', F.ceil(F.col('session_duration')/24).astype('int')) \
        .withColumn('a', F.when(F.col('n')>1, F.expr('array_repeat(8,n)')).otherwise(F.array('session_duration')))

>>> df1.show()
+---------------+-----------+----------------+-----------------+-------------------+---+--------------------+
|client_username|workstation|session_duration|access_point_name|         start_date|  n|                   a|
+---------------+-----------+----------------+-----------------+-------------------+---+--------------------+
|         XX1@AD|      Apple|            1.55|            idf_1|2019-06-01 00:00:00|  1|              [1.55]|
|         XX2@AD|      Apple|           30.12|            idf_2|2019-06-04 00:00:00|  2|          [8.0, 8.0]|
|         XX3@AD|      Apple|           78.25|            idf_3|2019-06-02 00:00:00|  4|[8.0, 8.0, 8.0, 8.0]|
|         XX4@AD|      Apple|            0.45|            idf_1|2019-06-02 00:00:00|  1|              [0.45]|
|         XX1@AD|      Apple|           23.11|            idf_1|2019-06-02 00:00:00|  1|             [23.11]|
+---------------+-----------+----------------+-----------------+-------------------+---+--------------------+

df_new = df1.select('client_username'
            , 'workstation'
            , F.posexplode('a').alias('pos', 'session_duration')
            , 'access_point_name'
            , F.expr('date_add(start_date, pos)').alias('start_date')
       ).drop('pos')

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

行の最初の値に基づいて、データフレームに新しい列を追加します

分類Dev

Pyspark:udfを使用して、別のデータフレームの値に基づいてデータフレームに新しい列を追加します

分類Dev

groupby 値に基づいて pandas データフレームに新しい列を追加します

分類Dev

列の複数の値に基づいてデータフレームに新しい行を作成します

分類Dev

特定の値の行に基づいて新しいデータフレームを作成します

分類Dev

他の列に基づいてpysparkデータフレームに新しい列を追加する

分類Dev

他の値に基づいてデータフレームの値を行に追加します

分類Dev

Rの他のデータフレームに値が存在するかどうかに基づいて、新しい列に新しい値を追加します

分類Dev

パンダ:別のデータフレームの値に基づいて、データフレームに新しい列を追加します

分類Dev

列の1つの値に基づいて、データフレームに行を追加します

分類Dev

特定の条件に基づいてデータフレーム行に値を追加します

分類Dev

データフレーム内の行を反復処理して新しい列を作成し、新しい列に基づいて列を追加します

分類Dev

Pyspark-異なるデータフレームの値に基づいてデータフレームに列を追加します

分類Dev

既存の列の値に基づいてpandasデータフレームに新しい列を追加する

分類Dev

他の列の値に基づいてデータフレームに新しい列を追加する

分類Dev

複数の列の値に基づいてデータフレームに新しい列を追加する

分類Dev

空白行の列に基づいてデータフレームに新しい列を追加する

分類Dev

条件に基づいてパンダデータフレームに新しい行を追加する

分類Dev

Rの条件に基づいて、データフレームに複数の新しい列を追加します

分類Dev

他の列の結果に基づいて、データフレームに新しい列を追加します

分類Dev

条件に基づいて多重指数データフレームに新しい列を追加します

分類Dev

列名に基づいてデータフレームに行を追加し、空の列にNAを追加します

分類Dev

pysparkはデータフレームに新しい行を追加します

分類Dev

条件に基づいてpysparkデータフレームに列を追加します

分類Dev

列の値に基づいて既存のデータフレームに新しい行を追加するにはどうすればよいですか?

分類Dev

欠落している日時値に基づいて空のデータフレーム行を追加する

分類Dev

複数の列の値に基づいて新しいデータフレーム列を作成します

分類Dev

条件に基づいてデータフレームに列を追加します

分類Dev

PySpark:データフレーム内のUUIDを持つ列に基づいて新しい列を追加します

Related 関連記事

  1. 1

    行の最初の値に基づいて、データフレームに新しい列を追加します

  2. 2

    Pyspark:udfを使用して、別のデータフレームの値に基づいてデータフレームに新しい列を追加します

  3. 3

    groupby 値に基づいて pandas データフレームに新しい列を追加します

  4. 4

    列の複数の値に基づいてデータフレームに新しい行を作成します

  5. 5

    特定の値の行に基づいて新しいデータフレームを作成します

  6. 6

    他の列に基づいてpysparkデータフレームに新しい列を追加する

  7. 7

    他の値に基づいてデータフレームの値を行に追加します

  8. 8

    Rの他のデータフレームに値が存在するかどうかに基づいて、新しい列に新しい値を追加します

  9. 9

    パンダ:別のデータフレームの値に基づいて、データフレームに新しい列を追加します

  10. 10

    列の1つの値に基づいて、データフレームに行を追加します

  11. 11

    特定の条件に基づいてデータフレーム行に値を追加します

  12. 12

    データフレーム内の行を反復処理して新しい列を作成し、新しい列に基づいて列を追加します

  13. 13

    Pyspark-異なるデータフレームの値に基づいてデータフレームに列を追加します

  14. 14

    既存の列の値に基づいてpandasデータフレームに新しい列を追加する

  15. 15

    他の列の値に基づいてデータフレームに新しい列を追加する

  16. 16

    複数の列の値に基づいてデータフレームに新しい列を追加する

  17. 17

    空白行の列に基づいてデータフレームに新しい列を追加する

  18. 18

    条件に基づいてパンダデータフレームに新しい行を追加する

  19. 19

    Rの条件に基づいて、データフレームに複数の新しい列を追加します

  20. 20

    他の列の結果に基づいて、データフレームに新しい列を追加します

  21. 21

    条件に基づいて多重指数データフレームに新しい列を追加します

  22. 22

    列名に基づいてデータフレームに行を追加し、空の列にNAを追加します

  23. 23

    pysparkはデータフレームに新しい行を追加します

  24. 24

    条件に基づいてpysparkデータフレームに列を追加します

  25. 25

    列の値に基づいて既存のデータフレームに新しい行を追加するにはどうすればよいですか?

  26. 26

    欠落している日時値に基づいて空のデータフレーム行を追加する

  27. 27

    複数の列の値に基づいて新しいデータフレーム列を作成します

  28. 28

    条件に基づいてデータフレームに列を追加します

  29. 29

    PySpark:データフレーム内のUUIDを持つ列に基づいて新しい列を追加します

ホットタグ

アーカイブ