这是我的流数据的样子:
time | id | group
---- | ---| ---
1 | a1 | b1
2 | a1 | b2
3 | a1 | b3
4 | a2 | b3
在我们的窗口中考虑上述所有示例。我的用例获取最新的不同 ID。
我需要输出如下所示:
time | id | group
---- | ---| ---
3 | a1 | b3
4 | a2 | b3
我怎样才能在 Flink 中实现这一点?
我知道窗口函数WindowFunction
。但是,我不能绕着做这件事。
我试过这个只是为了获得不同的ID。如何将此功能扩展到我的用例?
class DistinctGrid extends WindowFunction[UserMessage, String, Tuple, TimeWindow] {
override def apply(key: Tuple, window: TimeWindow, input: Iterable[UserMessage], out: Collector[String]): Unit = {
val distinctGeo = input.map(_.id).toSet
for (i <- distinctGeo) {
out.collect(i)
}
}
}
如果您通过 id 字段键控流,则无需考虑不同的 id —— 每个不同的键都有一个单独的窗口。您的窗口函数只需要遍历窗口内容以找到具有最大时间戳的 UserMessage,并将其作为窗口的结果输出(对于该键)。但是,有一个内置函数可以做到这一点——查看maxBy()的文档——所以在这种情况下不需要窗口函数。
粗略地说,这看起来像
stream.keyBy("id")
.timeWindow(Time.minutes(10))
.maxBy("time")
.print()
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句