我的代码使用readTextFile读取日志文件,当我在Flink(/opt/flink-1.0.3/bin/flink run -m yarn-cluster -yn 2 /home/flink/flink-json-0.1.jar
)中运行jar时,它将成功处理内部的行并停止我的应用程序,而不必等待新行。我需要一些参数吗?
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("hdfs:///test/ignicion.io")
先感谢您
你在找
StreamExecutionEnvironment.readFileStream(String filePath, long intervalMillis, WatchType watchType)
对于WatchType,您可以使用以下选项
来自的流
StreamExecutionEnvironment.readTextFile(String filePath, String charsetName)
在读取所有文件后将完成。我认为,它主要用于开发过程中的本地测试。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句