SparkdropDuplicates
保留第一个实例,并忽略该键的所有后续出现。是否可以在保留最新的情况下删除重复项?
例如,如果下面是我得到的微型批次,那么我想保留每个国家的最新记录(按时间戳记字段排序)。
batchId:0
Australia, 10, 2020-05-05 00:00:06
Belarus, 10, 2020-05-05 00:00:06
batchId:1
Australia, 10, 2020-05-05 00:00:08
Belarus, 10, 2020-05-05 00:00:03
然后,batchId 1之后的输出应低于-
Australia, 10, 2020-05-05 00:00:08
Belarus, 10, 2020-05-05 00:00:06
Update-1这是我拥有的当前代码
//KafkaDF is a streaming dataframe created from Kafka as source
val streamingDF = kafkaDF.dropDuplicates("country")
streamingDF.writeStream
.trigger(Trigger.ProcessingTime(10000L))
.outputMode("update")
.foreachBatch {
(batchDF: DataFrame, batchId: Long) => {
println("batchId: "+ batchId)
batchDF.show()
}
}.start()
我想输出的所有行都是新的或具有比迄今为止已处理的先前批处理中的任何记录都要大的时间戳。下面的例子
在batchId:0之后-两个国家都第一次出现,所以我应该在输出中将它们
Australia, 10, 2020-05-05 00:00:06
Belarus, 10, 2020-05-05 00:00:06
在batchId:1之后-白俄罗斯的时间戳早于我在批次0中收到的时间戳,因此不在输出中显示。显示澳大利亚是因为它的时间戳比我到目前为止所看到的要晚。
Australia, 10, 2020-05-05 00:00:08
现在,假设batchId 2同时出现这两个记录,因为它迟到了,所以它不应在该批次的输出中显示任何内容。
输入batchId:2
Australia, 10, 2020-05-05 00:00:01
Belarus, 10, 2020-05-05 00:00:01
在batchId之后:2
.
更新2
为每个批次添加输入和预期记录。标有红色的行将被丢弃,并且不会在输出中显示为具有相同国家/地区名称的另一行,并且在以前的批次中可以看到最新的时间戳记
为了避免流应用程序中的事件迟到,您需要在应用程序中保持一个状态,该状态在每个情况下都跟踪每个键的最新处理事件country
。
case class AppState(country:String, latestTs:java.sql.Timestamp)
对于微批处理,您可能会收到多个事件,当您执行该操作时,groupByKey(_.country)
您会得到一个属于的事件key(country)
,您需要将其与状态进行比较以找到最新的输入事件,并使用键的最新时间戳来更新状态。进行最新事件以进行进一步处理。对于迟到的事件,它应返回Option[Event]
并在后续过程中将其过滤掉。
请参阅此博客以获取详细说明。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句