如何在Spark结构化流中基于时间戳字段重复数据删除并保持最新?

conetfun

SparkdropDuplicates保留第一个实例,并忽略该键的所有后续出现。是否可以在保留最新的情况下删除重复项?

例如,如果下面是我得到的微型批次,那么我想保留每个国家的最新记录(按时间戳记字段排序)。

batchId:0

Australia, 10, 2020-05-05 00:00:06
Belarus, 10, 2020-05-05 00:00:06

batchId:1

Australia, 10, 2020-05-05 00:00:08
Belarus, 10, 2020-05-05 00:00:03

然后,batchId 1之后的输出应低于-

Australia, 10, 2020-05-05 00:00:08
Belarus, 10, 2020-05-05 00:00:06

Update-1这是我拥有的当前代码

//KafkaDF is a streaming dataframe created from Kafka as source
val streamingDF = kafkaDF.dropDuplicates("country")

streamingDF.writeStream
    .trigger(Trigger.ProcessingTime(10000L))
    .outputMode("update")
    .foreachBatch {
      (batchDF: DataFrame, batchId: Long) => {
        println("batchId: "+ batchId)
        batchDF.show()
      }
    }.start()

我想输出的所有行都是新的或具有比迄今为止已处理的先前批处理中的任何记录都要大的时间戳。下面的例子

在batchId:0之后-两个国家都第一次出现,所以我应该在输出中将它们

Australia, 10, 2020-05-05 00:00:06
Belarus, 10, 2020-05-05 00:00:06

在batchId:1之后-白俄罗斯的时间戳早于我在批次0中收到的时间戳,因此不在输出中显示。显示澳大利亚是因为它的时间戳比我到目前为止所看到的要晚。

Australia, 10, 2020-05-05 00:00:08

现在,假设batchId 2同时出现这两个记录,因为它迟到了,所以它不应在该批次的输出中显示任何内容。

输入batchId:2

Australia, 10, 2020-05-05 00:00:01
Belarus, 10, 2020-05-05 00:00:01

在batchId之后:2

.

更新2

为每个批次添加输入和预期记录。标有红色的行将被丢弃,并且不会在输出中显示为具有相同国家/地区名称的另一行,并且在以前的批次中可以看到最新的时间戳记在此处输入图片说明

西瓦库马尔

为了避免流应用程序中的事件迟到,您需要在应用程序中保持一个状态,该状态在每个情况下都跟踪每个键的最新处理事件country

case class AppState(country:String, latestTs:java.sql.Timestamp)

对于微批处理,您可能会收到多个事件,当您执行该操作时,groupByKey(_.country)您会得到一个属于的事件key(country),您需要将其与状态进行比较以找到最新的输入事件,并使用键的最新时间戳来更新状态。进行最新事件以进行进一步处理。对于迟到的事件,它应返回Option[Event]并在后续过程中将其过滤掉。

请参阅此博客以获取详细说明。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

如何删除Spark结构化流创建的旧数据?

来自分类Dev

如何将静态数据帧与Spark结构化流中的流数据进行比较?

来自分类Dev

如何在(Py)Spark结构化流中捕获不正确的(损坏的)JSON记录?

来自分类Dev

如何在Hive中对文件进行重复数据删除并保持原始排序顺序?

来自分类Dev

我们如何在Spark结构化流中管理偏移量?(_spark_metadata问题)

来自分类Dev

如何使用(Py)Spark结构化流为带有时间戳(来自Kafka)的JSON记录定义架构?-显示空值

来自分类Dev

Spark结构化流中的流数据帧读取模式

来自分类Dev

如何在结构化流中正确使用foreachBatch.batchDF.unpersist()?(保持错误)

来自分类Dev

如何在 apache 梁/数据流中跨重叠滑动窗口进行重复数据删除

来自分类Dev

从Spark结构化流以JSON数组形式写入数据

来自分类Dev

如何在Kafka Direct Stream中使用Spark结构化流?

来自分类Dev

Spark结构化流从查询异常中恢复

来自分类Dev

基于 Tableau Prep 中的两个字段进行重复数据删除

来自分类Dev

Spark结构化流时,DataFrame中的字符串列如何拆分为多个列

来自分类Dev

如何从MySQL中删除除最新数据之外的重复数据

来自分类Dev

如何从GridView中删除重复数据?

来自分类Dev

结构化流如何动态解析kafka的json数据

来自分类Dev

如何计算 ApacheSpark 结构化流中数据帧 API 的 z 分数?

来自分类Dev

如何在 Azure DataLake 中合并基本和多个增量结构化流

来自分类Dev

从结构化数组中删除`dtype`字段

来自分类常见问题

如何解析结构化流中的JSON记录?

来自分类Dev

如何解析结构化流中的JSON记录?

来自分类Dev

如何在Spark中将结构化数据映射到schemaRDD?

来自分类Dev

如何在Spark中将结构化数据映射到schemaRDD?

来自分类Dev

如何在R中的非结构化序列中删除常规月份的观测值?

来自分类Dev

在源中更新其基础数据时,结构化流中使用的Spark DataFrame会发生什么情况?

来自分类Dev

如何在 Marshmallow 中反序列化不同的结构化 JSON 数据?

来自分类Dev

Android-如何在Sqlite中基于重复数据合并行

来自分类Dev

如何在不复制*的情况下从结构化numpy数组中删除列?

Related 相关文章

  1. 1

    如何删除Spark结构化流创建的旧数据?

  2. 2

    如何将静态数据帧与Spark结构化流中的流数据进行比较?

  3. 3

    如何在(Py)Spark结构化流中捕获不正确的(损坏的)JSON记录?

  4. 4

    如何在Hive中对文件进行重复数据删除并保持原始排序顺序?

  5. 5

    我们如何在Spark结构化流中管理偏移量?(_spark_metadata问题)

  6. 6

    如何使用(Py)Spark结构化流为带有时间戳(来自Kafka)的JSON记录定义架构?-显示空值

  7. 7

    Spark结构化流中的流数据帧读取模式

  8. 8

    如何在结构化流中正确使用foreachBatch.batchDF.unpersist()?(保持错误)

  9. 9

    如何在 apache 梁/数据流中跨重叠滑动窗口进行重复数据删除

  10. 10

    从Spark结构化流以JSON数组形式写入数据

  11. 11

    如何在Kafka Direct Stream中使用Spark结构化流?

  12. 12

    Spark结构化流从查询异常中恢复

  13. 13

    基于 Tableau Prep 中的两个字段进行重复数据删除

  14. 14

    Spark结构化流时,DataFrame中的字符串列如何拆分为多个列

  15. 15

    如何从MySQL中删除除最新数据之外的重复数据

  16. 16

    如何从GridView中删除重复数据?

  17. 17

    结构化流如何动态解析kafka的json数据

  18. 18

    如何计算 ApacheSpark 结构化流中数据帧 API 的 z 分数?

  19. 19

    如何在 Azure DataLake 中合并基本和多个增量结构化流

  20. 20

    从结构化数组中删除`dtype`字段

  21. 21

    如何解析结构化流中的JSON记录?

  22. 22

    如何解析结构化流中的JSON记录?

  23. 23

    如何在Spark中将结构化数据映射到schemaRDD?

  24. 24

    如何在Spark中将结构化数据映射到schemaRDD?

  25. 25

    如何在R中的非结构化序列中删除常规月份的观测值?

  26. 26

    在源中更新其基础数据时,结构化流中使用的Spark DataFrame会发生什么情况?

  27. 27

    如何在 Marshmallow 中反序列化不同的结构化 JSON 数据?

  28. 28

    Android-如何在Sqlite中基于重复数据合并行

  29. 29

    如何在不复制*的情况下从结构化numpy数组中删除列?

热门标签

归档