我有一个火花数据框,具有两列(“ time_stamp”和“ message”)。
示例数据框:
Time_stamp Message
2020-12-01 05:28:34:215 some text1 ID: 1
2020-12-01 05:28:40:210 some text2 error: A
2020-12-01 05:28:40:220 some text3 error: B
2020-12-01 05:28:41:203 some text4 error: A
2020-12-01 05:30:43:201 some text5 ID: 1
2020-12-01 05:32:50:215 some text6 ID: 2
2020-12-01 05:32:50:220 some text7 error: A
2020-12-01 05:48:51:220 some text8 error: C
2020-12-01 05:48:52:203 some text9 error: B
2020-12-01 05:51:53:201 some text10 ID: 2
我想制作另一个具有ID和包含相同ID的两行之间有明显错误的数据框。
预期产量:
表格示例:
ID Error
1 A
1 B
2 A
2 C
2 B
谢谢
试试下面的代码。代码按ID分组,收集错误消息并为每个不同的错误消息获取最早的错误消息。时间顺序保持不变。
import pyspark.sql.functions as F
from pyspark.sql.window import Window
df2 = df.withColumn(
'Time_stamp',
F.to_timestamp('Time_stamp', 'yyyy-MM-dd HH:mm:ss:SSS')
).withColumn(
'ID',
F.regexp_extract('Message', 'ID: ([a-zA-Z0-9]+)', 1)
).withColumn(
'ID',
F.last(F.when(F.col('ID') != '', F.col('ID')), True).over(Window.orderBy('Time_stamp'))
).filter(
F.col('message').rlike('error')
).withColumn(
'Message',
F.regexp_extract('Message', 'error: (.*)', 1)
).groupBy('ID').agg(
F.collect_set(F.array('Message', 'Time_stamp')).alias('Message')
).select(
'ID',
F.explode('Message').alias('Message')
).selectExpr(
'ID',
'Message[0] as error',
'Message[1] as Time_stamp'
).withColumn(
'rn',
F.row_number().over(Window.partitionBy('ID', 'error').orderBy('Time_stamp'))
).filter('rn = 1').orderBy('Time_stamp').select('ID', 'error')
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句