我在测试新的Flink 1.0.0功能时遇到了一些麻烦。我一直在修改CEP,但尚未设法运行一个简单的演示代码:
val pattern : Pattern[TrafficEvent, _] = Pattern.begin[TrafficEvent]("start")
val patternStream = CEP.pattern(stream.javaStream, pattern);
class MyPatternSelectFunction extends PatternSelectFunction[TrafficEvent, TrafficEvent] {
override def select(pattern : java.util.Map[String, TrafficEvent]) : TrafficEvent ={
pattern.get("start")
}
}
val alerts = patternStream.select(new MyPatternSelectFunction())
该代码可以很好地编译,并且maven不会显示任何警告。TrafficEvent是一个几乎没有简单字段的类,stream是该类的Scala DataStream。当代码在Flink上运行时,将显示该错误。它运行一秒钟,然后代码退出,并显示以下错误消息:
该程序完成,但有以下异常:
Input mismatch: Tuple type expected.
org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:878)
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:302)
org.apache.flink.cep.PatternStream.select(PatternStream.java:64)
com.demo.DemoTraffic$.main(DemoTraffic.scala:311)
我试图通过构建像这样的静态类将功能迁移到Java(也许有一些奇怪的问题,从Scala调用API):
public static DataStream<DemoTraffic.trafficEvent> getStreamByPattern(DataStream<DemoTraffic.trafficEvent> stream) {
Pattern<DemoTraffic.trafficEvent, ?> pattern = Pattern.<DemoTraffic.trafficEvent>begin("start");
PatternStream<DemoTraffic.trafficEvent> patternStream = CEP.pattern(stream, pattern);
DataStream<DemoTraffic.trafficEvent> rvalue = patternStream.select(new PatternSelectFunction<DemoTraffic.trafficEvent, DemoTraffic.trafficEvent>() {
@Override
public DemoTraffic.trafficEvent select(Map<String, DemoTraffic.trafficEvent> pattern) throws Exception {
return pattern.get("start");
}
});
return rvalue;
}
但是结果是完全相同的,并且在PatternStream.select行中引发了相同的错误。关于我可以尝试或做错了什么的任何提示?如您所见,该模式非常愚蠢,并且仅用于测试目的。它仅接受所有事件,并作为响应返回该事件。使用Scala 2.10版本的Flink是1.0.0。
谢谢
我认为这TrafficEvent
是一个Scala案例类。CEP库是为Flink的Java API编写的,因此尚不支持Scala案例类。
解决方法是,您可以将案例类转换为普通的Scala类。
还有一张JIRA票证,用于跟踪CEP Scala API的开发。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句