私はこのようなデータフレームを持っています:
client_username|workstation|session_duration|access_point_name|start_date|
XX1@AD |Apple |1.55 |idf_1 |2019-06-01|
XX2@AD |Apple |30.12 |idf_2 |2019-06-04|
XX3@AD |Apple |78.25 |idf_3 |2019-06-02|
XX4@AD |Apple |0.45 |idf_1 |2019-06-02|
XX1@AD |Apple |23.11 |idf_1 |2019-06-02|
client_username - id of user in domain
workstation - user workstation
session_duration - duration (in hours) of the active session (user logged on hist host)
access_point_name - the name of access point that supplies the network to users host
start_date - start session
このようなデータフレームを実現したいと思います。
client_username|workstation|session_duration|access_point_name|start_date|
XX1@AD |Apple |1.55 |idf_1 |2019-06-01|
XX2@AD |Apple |8 |idf_2 |2019-06-04|
XX2@AD |Apple |8 |idf_2 |2019-06-05|
XX3@AD |Apple |8 |idf_3 |2019-06-02|
XX3@AD |Apple |8 |idf_3 |2019-06-03|
XX3@AD |Apple |8 |idf_3 |2019-06-04|
XX3@AD |Apple |8 |idf_3 |2019-06-05|
XX4@AD |Apple |0.45 |idf_1 |2019-06-02|
XX1@AD |Apple |23.11 |idf_1 |2019-06-02|
アイデアは次のとおりです。*セッションの長さが24時間以上、48時間未満の場合は、変更したいと思います。
XX2@AD |Apple |30.12 |idf_2 |2019-06-04|
それに:
XX2@AD |Apple |8 |idf_2 |2019-06-04|
XX2@AD |Apple |8 |idf_2 |2019-06-05|
セッションの期間は8時間に変更されますが、日数は2日に増加します(2019-06-04および2019-06-05)。48時間(3日)、72時間(4日)などを超える期間の分析状況。
私はpysparkを学び始めています。私が使用しようとしたunion
か、crossJoin
データフレーム上に、これは非常に現時点では私のために複雑になります。このタスクを使用して実行したいと思いますpyspark
。
試すことができるいくつかの方法は次のとおりです。
n = ceil(session_duration/24)
a
部分文字列8,
をn
何度も繰り返す文字列を作成し、substring()またはregexp_replace()を使用して末尾のコンマを削除します,
a
カンマで分割し、それをpos
との行に分解しますsession_duration
pos
上記の手順でstart_dateを調整しますsession_duration
をにキャストしますdouble
以下のコード例を参照してください。
from pyspark.sql import functions as F
# assume the columns in your dataframe are read with proper data types
# for example using inferSchema=True
df = spark.read.csv('/path/to/file', header=True, inferSchema=True)
df1 = df.withColumn('n', F.ceil(F.col('session_duration')/24).astype('int')) \
.withColumn('a', F.when(F.col('n')>1, F.expr('substring(repeat("8,",n),0,2*n-1)')).otherwise(F.col('session_duration')))
>>> df1.show()
+---------------+-----------+----------------+-----------------+-------------------+---+-------+
|client_username|workstation|session_duration|access_point_name| start_date| n| a|
+---------------+-----------+----------------+-----------------+-------------------+---+-------+
| XX1@AD| Apple| 1.55| idf_1|2019-06-01 00:00:00| 1| 1.55|
| XX2@AD| Apple| 30.12| idf_2|2019-06-04 00:00:00| 2| 8,8|
| XX3@AD| Apple| 78.25| idf_3|2019-06-02 00:00:00| 4|8,8,8,8|
| XX4@AD| Apple| 0.45| idf_1|2019-06-02 00:00:00| 1| 0.45|
| XX1@AD| Apple| 23.11| idf_1|2019-06-02 00:00:00| 1| 23.11|
+---------------+-----------+----------------+-----------------+-------------------+---+-------+
df_new = df1.select(
'client_username'
, 'workstation'
, F.posexplode(F.split('a', ',')).alias('pos', 'session_duration')
, 'access_point_name'
, F.expr('date_add(start_date, pos)').alias('start_date')
).drop('pos')
>>> df_new.show()
+---------------+-----------+----------------+-----------------+----------+
|client_username|workstation|session_duration|access_point_name|start_date|
+---------------+-----------+----------------+-----------------+----------+
| XX1@AD| Apple| 1.55| idf_1|2019-06-01|
| XX2@AD| Apple| 8| idf_2|2019-06-04|
| XX2@AD| Apple| 8| idf_2|2019-06-05|
| XX3@AD| Apple| 8| idf_3|2019-06-02|
| XX3@AD| Apple| 8| idf_3|2019-06-03|
| XX3@AD| Apple| 8| idf_3|2019-06-04|
| XX3@AD| Apple| 8| idf_3|2019-06-05|
| XX4@AD| Apple| 0.45| idf_1|2019-06-02|
| XX1@AD| Apple| 23.11| idf_1|2019-06-02|
+---------------+-----------+----------------+-----------------+----------+
上記のコードは、1つのチェーンに書き込むこともできます。
df_new = df.withColumn('n'
, F.ceil(F.col('session_duration')/24).astype('int')
).withColumn('a'
, F.when(F.col('n')>1, F.expr('substring(repeat("8,",n),0,2*n-1)')).otherwise(F.col('session_duration'))
).select('client_username'
, 'workstation'
, F.posexplode(F.split('a', ',')).alias('pos', 'session_duration')
, 'access_point_name'
, F.expr('date_add(start_date, pos)').alias('start_date')
).withColumn('session_duration'
, F.col('session_duration').astype('double')
).drop('pos')
Method-1と同様ですが、a
すでに配列であるため、文字列を配列に分割する必要はありません。
df1 = df.withColumn('n', F.ceil(F.col('session_duration')/24).astype('int')) \
.withColumn('a', F.when(F.col('n')>1, F.expr('array_repeat(8,n)')).otherwise(F.array('session_duration')))
>>> df1.show()
+---------------+-----------+----------------+-----------------+-------------------+---+--------------------+
|client_username|workstation|session_duration|access_point_name| start_date| n| a|
+---------------+-----------+----------------+-----------------+-------------------+---+--------------------+
| XX1@AD| Apple| 1.55| idf_1|2019-06-01 00:00:00| 1| [1.55]|
| XX2@AD| Apple| 30.12| idf_2|2019-06-04 00:00:00| 2| [8.0, 8.0]|
| XX3@AD| Apple| 78.25| idf_3|2019-06-02 00:00:00| 4|[8.0, 8.0, 8.0, 8.0]|
| XX4@AD| Apple| 0.45| idf_1|2019-06-02 00:00:00| 1| [0.45]|
| XX1@AD| Apple| 23.11| idf_1|2019-06-02 00:00:00| 1| [23.11]|
+---------------+-----------+----------------+-----------------+-------------------+---+--------------------+
df_new = df1.select('client_username'
, 'workstation'
, F.posexplode('a').alias('pos', 'session_duration')
, 'access_point_name'
, F.expr('date_add(start_date, pos)').alias('start_date')
).drop('pos')
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加