在flink processFunction中,onTimer()函数中的所有mapstate为空

一种薄荷糖

我想通过processKeyedFunction实现aggregationFunction,因为默认的aggregationFunction不支持丰富的功能,此外,我尝试了aggreagationFunction + processWindowFunction(https://ci.apache.org/projects/flink/flink-docs-stable/dev/ stream / operators / windows.html),但它也不能满足我的需求,因此我必须使用基本的processKeyedFunction来实现aggregationFunction,我的问题如下:

在processFunction中,我定义一个windowState用于阶段元素的聚合值,代码如下:

public void open(Configuration parameters) throws Exception {
followCacheMap = FollowSet.getInstance();
windowState = getRuntimeContext().getMapState(windowStateDescriptor);
currentTimer = getRuntimeContext().getState(new ValueStateDescriptor<Long>(
        "timer",
        Long.class
));

在processElement()函数中,我使用windowState(在open函数中启动的MapState)聚合窗口元素,并注册第一个timeServie以清除当前窗口状态,代码如下:

 @Override
public void processElement(FollowData value, Context ctx, Collector<FollowData> out) throws Exception 
{
      if ( (currentTimer==null || (currentTimer.value() ==null) || (long)currentTimer.value()==0 ) && value.getClickTime() != null) {
            currentTimer.update(value.getClickTime() + interval);
            ctx.timerService().registerEventTimeTimer((long)currentTimer.value());
        } 
       windowState = doMyAggregation(value);
}

在onTimer()函数中,首先,我在下一分钟内注册next timeService,并清除窗口状态

 @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<FollowData> out) throws Exception {
        currentTimer.update(timestamp + interval);   // interval is 1 minute
        ctx.timerService().registerEventTimeTimer((long)currentTimer.value());
        
        out.collect(windowState);
        windowState.clear();
     }

但是当程序运行时,我发现onTimer中的所有windowState都是空的,但在processElement()函数中不是empyt,我不知道为什么会这样,也许执行逻辑不同,如何解决这个问题, 提前致谢 !



关于doMyAggregation()部分的新添加代码


windowState是一个MapState,键是“ mykey”,值是一个自定义对象AggregateFollow

public class AggregateFollow {
    private String clicked;
    private String unionid;
    private ArrayList allFollows;
    private int enterCnt;
    private Long clickTime;

}

并且doMyAggregation(value)函数非常像这样,doMyAggregation的功能是获取源字段为“ follow”的所有值,但是如果在1分钟内没有字段为“ click”的值,则“ follow” ”的值应该过时了,总之,就像“跟随”数据和“点击”数据的联接操作,

AggregateFollow acc = windowState.get(windowkey);
    String flag = acc.getClicked();
    ArrayList<FollowData> followDataList = acc.getAllFollows();
    if ("0".equals(flag)) {
        if ("follow".equals(value.getSource())) {
            followDataList.add(value);
            acc.setAllFollows(followDataList);
        }
        if ("click".equals(value.getSource())) {
            String unionid = value.getUnionid();
            clickTime = value.getClickTime();
            if (followDataList.size() > 0) {
                ArrayList listNew = new ArrayList();
                for (FollowData followData : followDataList) {
                    followData.setUnionid(unionid);
                    followData.setClickTime(clickTime);
                    followData.setSource("joined_flag");   // 
                }
                acc.setAllFollows(listNew);
            }
            acc.setClicked("1");
            acc.setUnionid(unionid);
            acc.setClickTime(clickTime);
            windowState.put(windowkey, acc);
        }
    } else if ("1".equals(flag)) {
        if ("follow".equals(value.getSource())) {
            value.setUnionid(acc.getUnionid());
            value.setClickTime(acc.getClickTime());
            value.setSource("joined_flag");  
            followDataList.add(value);
            acc.setAllFollows(followDataList);
            windowState.put(windowkey, acc);
        }
    }

由于性能问题,原始的windowAPI对我来说不是一个有效的选择,我认为这里的唯一方法是使用processFunction + ontimer和Guava Cache,非常感谢

一种薄荷糖

我将windowState的类型从MapState更改为ValueState,问题已解决,也许是错误或其他问题,任何人都可以解释一下吗?

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

如何在Flink中为MapState中的所有项目设置TTL?

来自分类Dev

如何在Flink中为MapState中的所有项目设置TTL?

来自分类Dev

是整个MapState实例或MapState中每个元素的flink MapState TTL

来自分类Dev

清除 Flink MapState

来自分类Dev

Flink中的advanceToEndOfEventTime标志

来自分类Dev

为什么我在 Flink 中的 MapState 变量没有保留以前的值?

来自分类Dev

Apache Flink中的状态函数

来自分类Dev

如何理解Apache Flink中的setParallelism函数

来自分类Dev

Apache Flink中的重叠分区

来自分类Dev

Apache Flink中的并行度

来自分类Dev

Apache Flink中的全局排序

来自分类Dev

Apache Flink中Join的输出

来自分类Dev

Flink 中的动态 SQL 查询

来自分类Dev

Flink 在 timeWindow 上应用函数

来自分类Dev

单元测试 Flink 函数

来自分类Dev

HBaseWriteStreamExample,但Flink docs中没有HBaseReadStreamExample

来自分类Dev

Flink中没有窗口的乱序元组

来自分类Dev

flink reduceGroup中的迭代器行为

来自分类Dev

从Apache Flink中的输入文件创建对象

来自分类Dev

Flink中的RocksDBStateBackend:它是如何工作的?

来自分类Dev

Flink文件接收器中的容错

来自分类Dev

IntelliJ中Flink WordCount输出的数字

来自分类Dev

Flink中不推荐使用JSONDeserializationSchema()吗?

来自分类Dev

在Eclipse中编译Apache Flink示例

来自分类Dev

KafkaProducer08在Flink异常中

来自分类Dev

如何遍历Flink DataStream中的每条消息?

来自分类Dev

Apache Flink:ConnectedStreams中ValueState的范围

来自分类Dev

apache-flink:输出中的滑动窗口

来自分类Dev

Flink CEP 中的序列匹配语义