我已经基于Java中基于Scala的HdfsCount示例成功构建了一个非常简单的Spark Streaming应用程序。
当我将此应用程序提交到本地Spark时,它等待将文件写入给定目录,并且当我创建该文件时,它会成功打印字数。我按Ctrl + C终止应用程序。
现在,我已经尝试为此功能创建一个非常基本的单元测试,但是在测试中,我无法打印相同的信息,即字数。
我想念什么?
以下是单元测试文件,此后,我还包括了显示countWords方法的代码段:
import com.google.common.io.Files;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.junit.*;
import java.io.*;
public class StarterAppTest {
JavaStreamingContext ssc;
File tempDir;
@Before
public void setUp() {
ssc = new JavaStreamingContext("local", "test", new Duration(3000));
tempDir = Files.createTempDir();
tempDir.deleteOnExit();
}
@After
public void tearDown() {
ssc.stop();
ssc = null;
}
@Test
public void testInitialization() {
Assert.assertNotNull(ssc.sc());
}
@Test
public void testCountWords() {
StarterApp starterApp = new StarterApp();
try {
JavaDStream<String> lines = ssc.textFileStream(tempDir.getAbsolutePath());
JavaPairDStream<String, Integer> wordCounts = starterApp.countWords(lines);
ssc.start();
File tmpFile = new File(tempDir.getAbsolutePath(), "tmp.txt");
PrintWriter writer = new PrintWriter(tmpFile, "UTF-8");
writer.println("8-Dec-2014: Emre Emre Emre Ergin Ergin Ergin");
writer.close();
System.err.println("===== Word Counts =======");
wordCounts.print();
System.err.println("===== Word Counts =======");
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
Assert.assertTrue(true);
}
}
该测试会编译并开始运行,Spark Streaming在控制台上会打印很多诊断消息,但是对调用的内容wordCounts.print()
不会打印任何内容,而在StarterApp.java本身中,它们会打印。
我也尝试过添加ssc.awaitTermination();
,ssc.start()
但是在这方面没有任何改变。之后,我还尝试在此Spark Streaming应用程序正在检查的目录中手动创建一个新文件,但这一次它给出了错误。
为了完整起见,下面是wordCounts方法:
public JavaPairDStream<String, Integer> countWords(JavaDStream<String> lines) {
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) { return Lists.newArrayList(SPACE.split(x)); }
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) { return new Tuple2<>(s, 1); }
}).reduceByKey((i1, i2) -> i1 + i2);
return wordCounts;
}
几个指针:
ssc.start
发出文件后立即创建文件。不能保证文件系统监听器已经存在。我会做一些sleep(xx)
后ssc.start
在流媒体中,一切都与正确的时机有关。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句