如何在Flink中实现一个可以缓存直到超时并在超时后触发的触发器?
我希望如果窗口中至少有一个元素,则要注册触发器,然后缓冲到第二个元素,并在经过一秒钟时触发。如果窗口中没有元素,则触发器不会自行注册,因此我不希望看到任何输出。
我不希望触发器每秒产生大量流量,而不管窗口中是否存在元素。另一方面说,窗口中只有一个元素,我不希望它坐在那里,等到水印或永久出现。相反,我希望有一个超时时间,以便我可以看到一个元素在第二秒后至少消失。
不ProcessingTimeTrigger.create()
这样做呢?如果是这样,ProcessingTimeTrigger.create()
vs之间有什么区别CountinousProcessingTimeTrigger
?
正常的一秒钟长的处理时间窗口将为您提供一个窗口,其中包含一秒钟内发生的所有事件,对于其中至少有一个事件的任何一秒钟,该窗口都是如此。但是此窗口不会与第一个事件对齐;它将与时间时钟对齐。因此,例如,如果某个窗口中的第一个事件发生在给定第二个事件的一半,那么该窗口将仅包含第一个事件之后500毫秒的事件。
一个ProcessingTimeTrigger
闪光一次,在窗口的结束。一个CountinousProcessingTimeTrigger
在某个指定的速率重复触发。
为了准确获取您描述的语义,您需要一个自定义触发器。您可以执行与OneSecondIntervalTrigger示例类似的操作,除了您要从使用事件时间切换到处理时间,并且只触发一次,而不是重复触发。
这将使您具有以下内容:
public static class OneSecondIntervalTrigger extends Trigger<SensorReading, TimeWindow> {
@Override
public TriggerResult onElement(SensorReading r, long ts, TimeWindow w, TriggerContext ctx) throws Exception {
// firstSeen will be false if not set yet
ValueState<Boolean> firstSeen = ctx.getPartitionedState(
new ValueStateDescriptor<>("firstSeen", Types.BOOLEAN));
// register initial timer only for first element
if (firstSeen.value() == null) {
// FIRE the window 1000 msec after the first event
long now = ctx.getCurrentProcessingTime();
ctx.registerProcessingTimeTimer(now + 1000);
fireSeen.update(true);
}
// Continue. Do not evaluate window now
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long ts, TimeWindow w, TriggerContext ctx) throws Exception {
// Continue. We don't use event time timers
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long ts, TimeWindow w, TriggerContext ctx) throws Exception {
// Evaluate the window now
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public void clear(TimeWindow w, TriggerContext ctx) throws Exception {
// Clear trigger state
ValueState<Boolean> firstSeen = ctx.getPartitionedState(
new ValueStateDescriptor<>("firstSeen", Types.BOOLEAN));
firstSeen.clear();
}
}
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句