我想通过processKeyedFunction实现aggregationFunction,因为默认的aggregationFunction不支持丰富的功能,此外,我尝试了aggreagationFunction + processWindowFunction(https://ci.apache.org/projects/flink/flink-docs-stable/dev/ stream / operators / windows.html),但它也不能满足我的需求,因此我必须使用基本的processKeyedFunction来实现aggregationFunction,我的问题如下:
在processFunction中,我定义一个windowState用于阶段元素的聚合值,代码如下:
public void open(Configuration parameters) throws Exception {
followCacheMap = FollowSet.getInstance();
windowState = getRuntimeContext().getMapState(windowStateDescriptor);
currentTimer = getRuntimeContext().getState(new ValueStateDescriptor<Long>(
"timer",
Long.class
));
在processElement()函数中,我使用windowState(在open函数中启动的MapState)聚合窗口元素,并注册第一个timeServie以清除当前窗口状态,代码如下:
@Override
public void processElement(FollowData value, Context ctx, Collector<FollowData> out) throws Exception
{
if ( (currentTimer==null || (currentTimer.value() ==null) || (long)currentTimer.value()==0 ) && value.getClickTime() != null) {
currentTimer.update(value.getClickTime() + interval);
ctx.timerService().registerEventTimeTimer((long)currentTimer.value());
}
windowState = doMyAggregation(value);
}
在onTimer()函数中,首先,我在下一分钟内注册next timeService,并清除窗口状态
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<FollowData> out) throws Exception {
currentTimer.update(timestamp + interval); // interval is 1 minute
ctx.timerService().registerEventTimeTimer((long)currentTimer.value());
out.collect(windowState);
windowState.clear();
}
但是当程序运行时,我发现onTimer中的所有windowState都是空的,但在processElement()函数中不是empyt,我不知道为什么会这样,也许执行逻辑不同,如何解决这个问题, 提前致谢 !
windowState是一个MapState,键是“ mykey”,值是一个自定义对象AggregateFollow
public class AggregateFollow {
private String clicked;
private String unionid;
private ArrayList allFollows;
private int enterCnt;
private Long clickTime;
}
并且doMyAggregation(value)函数非常像这样,doMyAggregation的功能是获取源字段为“ follow”的所有值,但是如果在1分钟内没有字段为“ click”的值,则“ follow” ”的值应该过时了,总之,就像“跟随”数据和“点击”数据的联接操作,
AggregateFollow acc = windowState.get(windowkey);
String flag = acc.getClicked();
ArrayList<FollowData> followDataList = acc.getAllFollows();
if ("0".equals(flag)) {
if ("follow".equals(value.getSource())) {
followDataList.add(value);
acc.setAllFollows(followDataList);
}
if ("click".equals(value.getSource())) {
String unionid = value.getUnionid();
clickTime = value.getClickTime();
if (followDataList.size() > 0) {
ArrayList listNew = new ArrayList();
for (FollowData followData : followDataList) {
followData.setUnionid(unionid);
followData.setClickTime(clickTime);
followData.setSource("joined_flag"); //
}
acc.setAllFollows(listNew);
}
acc.setClicked("1");
acc.setUnionid(unionid);
acc.setClickTime(clickTime);
windowState.put(windowkey, acc);
}
} else if ("1".equals(flag)) {
if ("follow".equals(value.getSource())) {
value.setUnionid(acc.getUnionid());
value.setClickTime(acc.getClickTime());
value.setSource("joined_flag");
followDataList.add(value);
acc.setAllFollows(followDataList);
windowState.put(windowkey, acc);
}
}
由于性能问题,原始的windowAPI对我来说不是一个有效的选择,我认为这里的唯一方法是使用processFunction + ontimer和Guava Cache,非常感谢
我将windowState的类型从MapState更改为ValueState,问题已解决,也许是错误或其他问题,任何人都可以解释一下吗?
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句