我试图增强显示流用法的Flink示例。我的目标是使用窗口功能(请参见window
函数调用)。我假设下面的代码输出流的最后3个数字的总和。(由于nc -lk 9999
ubuntu上的,因此打开了流)实际上,输出汇总了所有输入的数字。切换到时间窗口会产生相同的结果,即不产生窗口。
那是个错误吗?(使用的版本:github上的最新版本)
object SocketTextStreamWordCount {
def main(args: Array[String]) {
val hostName = args(0)
val port = args(1).toInt
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Create streams for names and ages by mapping the inputs to the corresponding objects
val text = env.socketTextStream(hostName, port)
val currentMap = text.flatMap { (x:String) => x.toLowerCase.split("\\W+") }
.filter { (x:String) => x.nonEmpty }
.window(Count.of(3)).every(Time.of(1, TimeUnit.SECONDS))
// .window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(1, TimeUnit.SECONDS))
.map { (x:String) => ("not used; just to have a tuple for the sum", x.toInt) }
val numberOfItems = currentMap.count
numberOfItems print
val counts = currentMap.sum( 1 )
counts print
env.execute("Scala SocketTextStreamWordCount Example")
}
}
问题似乎是存在从WindowedDataStream
到的隐式转换DataStream
。这种隐式转换调用flatten()
的WindowedDataStream
。
在您的情况下,代码将扩展为:
val currentMap = text.flatMap { (x:String) => x.toLowerCase.split("\\W+") }
.filter { (x:String) => x.nonEmpty }
.window(Count.of(3)).every(Time.of(1, TimeUnit.SECONDS))
.flatten()
.map { (x:String) => ("not used; just to have a tuple for the sum",x.toInt) }
是什么flatten()
做的是类似flatMap()
的集合。它采用窗口流,该窗口流可以看作是collections的集合[[a,b,c], [d,e,f]]
,并将其转换为元素流:[a,b,c,d,e,f]
。
这意味着您的计数实际上仅对已窗口化和“去窗口化”的原始流起作用。看起来它从未被窗口化。
这是一个问题,我将立即进行修复。(我是Flink的提交者之一。)您可以在这里跟踪进度:https : //issues.apache.org/jira/browse/FLINK-2096
使用当前API的方法是:
val currentMap = text.flatMap { (x:String) => x.toLowerCase.split("\\W+") }
.filter { (x:String) => x.nonEmpty }
.map { (x:String) => ("not used; just to have a tuple for the sum",x.toInt) }
.window(Count.of(3)).every(Time.of(1, TimeUnit.SECONDS))
WindowedDataStream
具有sum()方法,因此不会隐式插入flatten()调用。不幸的是,count()
它不可用,WindowedDataStream
因此您必须手动将一个1
字段添加到元组并计数。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句