如何在flink中实现一个触发器,该缓冲区一直缓存到超时,并在超时后触发?

用户名

如何在Flink中实现一个可以缓存直到超时并在超时后触发的触发器?

我希望如果窗口中至少有一个元素,则要注册触发器,然后缓冲到第二个元素,并在经过一秒钟时触发。如果窗口中没有元素,则触发器不会自行注册,因此我不希望看到任何输出。

我不希望触发器每秒产生大量流量,而不管窗口中是否存在元素。另一方面说,窗口中只有一个元素,我不希望它坐在那里,等到水印或永久出现。相反,我希望有一个超时时间,以便我可以看到一个元素在第二秒后至少消失。

ProcessingTimeTrigger.create()这样做呢?如果是这样,ProcessingTimeTrigger.create()vs之间有什么区别CountinousProcessingTimeTrigger

大卫·安德森

正常的一秒钟长的处理时间窗口将为您提供一个窗口,其中包含一秒钟内发生的所有事件,对于其中至少有一个事件的任何一秒钟,该窗口都是如此。但是此窗口不会与第一个事件对齐;它将与时间时钟对齐。因此,例如,如果某个窗口中的第一个事件发生在给定第二个事件的一半,那么该窗口将仅包含第一个事件之后500毫秒的事件。

一个ProcessingTimeTrigger闪光一次,在窗口的结束。一个CountinousProcessingTimeTrigger在某个指定的速率重复触发。

为了准确获取您描述的语义,您需要一个自定义触发器。您可以执行与OneSecondIntervalTrigger示例类似的操作,除了您要从使用事件时间切换到处理时间,并且只触发一次,而不是重复触发。

这将使您具有以下内容:

public static class OneSecondIntervalTrigger extends Trigger<SensorReading, TimeWindow> {

    @Override
    public TriggerResult onElement(SensorReading r, long ts, TimeWindow w, TriggerContext ctx) throws Exception {
        // firstSeen will be false if not set yet
        ValueState<Boolean> firstSeen = ctx.getPartitionedState(
            new ValueStateDescriptor<>("firstSeen", Types.BOOLEAN));

        // register initial timer only for first element
        if (firstSeen.value() == null) {
            // FIRE the window 1000 msec after the first event
            long now = ctx.getCurrentProcessingTime();
            ctx.registerProcessingTimeTimer(now + 1000);
            fireSeen.update(true);
        }
        // Continue. Do not evaluate window now
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long ts, TimeWindow w, TriggerContext ctx) throws Exception {
        // Continue. We don't use event time timers
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long ts, TimeWindow w, TriggerContext ctx) throws Exception {
        // Evaluate the window now
        return TriggerResult.FIRE_AND_PURGE;
    }

    @Override
    public void clear(TimeWindow w, TriggerContext ctx) throws Exception {
        // Clear trigger state
        ValueState<Boolean> firstSeen = ctx.getPartitionedState(
            new ValueStateDescriptor<>("firstSeen", Types.BOOLEAN));
        firstSeen.clear();
    }
}

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

将两个触发器保存到一个触发器文件中

来自分类Dev

如何在更新表后运行触发器然后在同一个表中运行语句更新

来自分类Dev

如何在Zapier触发器中JSON.parse一个数组?

来自分类Dev

如何在zabbix触发器中显示一个变量和一个常数

来自分类Dev

如何编写一个触发器,该触发器可以在插入时将一个表中的新行复制到另一个表中?

来自分类Dev

如何在zabbix中添加一个通知我其他触发器设置为unknow的触发器?

来自分类Dev

想要缓冲一个Observable直到另一个触发,然后删除缓冲区并按一个订阅正常触发

来自分类Dev

触发第一个事件后几秒钟的RX缓冲区事件

来自分类Dev

在一个触发器中插入和删除

来自分类Dev

PostgreSQL创建触发器,该触发器在表的每次插入或更新时运行一个函数

来自分类Dev

WPF一个触发多个值的触发器

来自分类Dev

另一个触发器完成后,触发点击事件

来自分类Dev

另一个触发器完成后,触发点击事件

来自分类Dev

如何在协议缓冲区中存储一个字节?

来自分类Dev

如何在多个数据库表上创建一个触发器

来自分类Dev

如何在一个触发器内放置多组方程

来自分类Dev

如何在多个数据库表上创建一个触发器

来自分类Dev

如何在默认情况下使一个触发器保持打开状态

来自分类Dev

如何在两个表上一个接一个地编写更新触发器

来自分类Dev

自第一个新组元素以来具有超时的Rx缓冲区

来自分类Dev

如何创建一个 MySQL 触发器,用于在插入或更新后使用另一个表中字段的数据更新表的总和

来自分类Dev

在SQL中创建一个触发器,该触发器从一个表中读取数据,并将新数据插入到另一个表中

来自分类Dev

如何在触发器中将值从一个查询传递到另一个查询

来自分类Dev

如何编写触发器以更新另一个表中的行?

来自分类Dev

如何仅在一个事务中禁用PostgreSQL触发器?

来自分类Dev

如何使用Postgres触发器双重写入一个表中的两列?

来自分类Dev

如何编写触发器以更新另一个表中的行?

来自分类Dev

如何使用删除触发器从同一个表中删除行?

来自分类Dev

如何编写一个触发器来检查 Oracle 中的更新值?

Related 相关文章

  1. 1

    将两个触发器保存到一个触发器文件中

  2. 2

    如何在更新表后运行触发器然后在同一个表中运行语句更新

  3. 3

    如何在Zapier触发器中JSON.parse一个数组?

  4. 4

    如何在zabbix触发器中显示一个变量和一个常数

  5. 5

    如何编写一个触发器,该触发器可以在插入时将一个表中的新行复制到另一个表中?

  6. 6

    如何在zabbix中添加一个通知我其他触发器设置为unknow的触发器?

  7. 7

    想要缓冲一个Observable直到另一个触发,然后删除缓冲区并按一个订阅正常触发

  8. 8

    触发第一个事件后几秒钟的RX缓冲区事件

  9. 9

    在一个触发器中插入和删除

  10. 10

    PostgreSQL创建触发器,该触发器在表的每次插入或更新时运行一个函数

  11. 11

    WPF一个触发多个值的触发器

  12. 12

    另一个触发器完成后,触发点击事件

  13. 13

    另一个触发器完成后,触发点击事件

  14. 14

    如何在协议缓冲区中存储一个字节?

  15. 15

    如何在多个数据库表上创建一个触发器

  16. 16

    如何在一个触发器内放置多组方程

  17. 17

    如何在多个数据库表上创建一个触发器

  18. 18

    如何在默认情况下使一个触发器保持打开状态

  19. 19

    如何在两个表上一个接一个地编写更新触发器

  20. 20

    自第一个新组元素以来具有超时的Rx缓冲区

  21. 21

    如何创建一个 MySQL 触发器,用于在插入或更新后使用另一个表中字段的数据更新表的总和

  22. 22

    在SQL中创建一个触发器,该触发器从一个表中读取数据,并将新数据插入到另一个表中

  23. 23

    如何在触发器中将值从一个查询传递到另一个查询

  24. 24

    如何编写触发器以更新另一个表中的行?

  25. 25

    如何仅在一个事务中禁用PostgreSQL触发器?

  26. 26

    如何使用Postgres触发器双重写入一个表中的两列?

  27. 27

    如何编写触发器以更新另一个表中的行?

  28. 28

    如何使用删除触发器从同一个表中删除行?

  29. 29

    如何编写一个触发器来检查 Oracle 中的更新值?

热门标签

归档