如何在单元测试中使Spark Streaming计数文件中的单词?

EmreSevinç

我已经基于Java中基于ScalaHdfsCount示例成功构建了一个非常简单的Spark Streaming应用程序

当我将此应用程序提交到本地Spark时,它等待将文件写入给定目录,并且当我创建该文件时,它会成功打印字数。我按Ctrl + C终止应用程序。

现在,我已经尝试为此功能创建一个非常基本的单元测试,但是在测试中,我无法打印相同的信息,即字数。

我想念什么?

以下是单元测试文件,此后,我还包括了显示countWords方法的代码段:

StarterAppTest.java

import com.google.common.io.Files;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;


import org.junit.*;

import java.io.*;

public class StarterAppTest {

  JavaStreamingContext ssc;
  File tempDir;

  @Before
  public void setUp() {
    ssc = new JavaStreamingContext("local", "test", new Duration(3000));
    tempDir = Files.createTempDir();
    tempDir.deleteOnExit();
  }

  @After
  public void tearDown() {
    ssc.stop();
    ssc = null;
  }

  @Test
  public void testInitialization() {
    Assert.assertNotNull(ssc.sc());
  }


  @Test
  public void testCountWords() {

    StarterApp starterApp = new StarterApp();

    try {
      JavaDStream<String> lines = ssc.textFileStream(tempDir.getAbsolutePath());
      JavaPairDStream<String, Integer> wordCounts = starterApp.countWords(lines);

      ssc.start();

      File tmpFile = new File(tempDir.getAbsolutePath(), "tmp.txt");
      PrintWriter writer = new PrintWriter(tmpFile, "UTF-8");
      writer.println("8-Dec-2014: Emre Emre Emre Ergin Ergin Ergin");
      writer.close();

      System.err.println("===== Word Counts =======");
      wordCounts.print();
      System.err.println("===== Word Counts =======");

    } catch (FileNotFoundException e) {
      e.printStackTrace();
    } catch (UnsupportedEncodingException e) {
      e.printStackTrace();
    }


    Assert.assertTrue(true);

  }

}

该测试会编译并开始运行,Spark Streaming在控制台上会打印很多诊断消息,但是对调用的内容wordCounts.print()不会打印任何内容,而在StarterApp.java本身中,它们会打印。

我也尝试过添加ssc.awaitTermination();ssc.start()但是在这方面没有任何改变。之后,我还尝试在此Spark Streaming应用程序正在检查的目录中手动创建一个新文件,但这一次它给出了错误。

为了完整起见,下面是wordCounts方法:

public JavaPairDStream<String, Integer> countWords(JavaDStream<String> lines) {
    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
      @Override
      public Iterable<String> call(String x) { return Lists.newArrayList(SPACE.split(x)); }
    });

    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
            new PairFunction<String, String, Integer>() {
              @Override
              public Tuple2<String, Integer> call(String s) { return new Tuple2<>(s, 1); }
            }).reduceByKey((i1, i2) -> i1 + i2);

    return wordCounts;
  }
马斯格

几个指针:

  • 给SparkStreaming上下文至少两个内核。1用于流式传输,1用于Spark处理。“本地”->“本地[2]”
  • 流式传输间隔为3000毫秒,因此您需要在程序中的某个位置(至少)等待该时间才能获得输出。
  • Spark Streaming需要一些时间来设置侦听器。ssc.start发出文件后立即创建文件不能保证文件系统监听器已经存在。我会做一些sleep(xx)ssc.start

在流媒体中,一切都与正确的时机有关。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

如何在单元测试中禁止Spark日志记录?

来自分类Dev

如何在仪器化的单元测试中使用文件

来自分类Dev

如何在仪器化的单元测试中使用文件

来自分类Dev

在单元测试中,如何在HttpClient中使用FakeItEasy?

来自分类Dev

如何在Struts单元测试中测试文件上传?

来自分类Dev

如何在Struts单元测试中测试文件上传?

来自分类Dev

模拟 API 响应时如何在单元测试中使用 JSON 文件

来自分类Dev

如何在Xcode 6.1.1中使用Google Test运行本地C ++单元测试

来自分类Dev

如何在Yii2中使用Codeception运行单元测试?

来自分类Dev

如何在单元测试中使用Moq在异步方法中返回传递的参数?

来自分类Dev

如何在Angular 8中使用对象模拟数组数据以进行单元测试

来自分类Dev

如何在 Laravel 5 中使用 Codeception 在单元测试中模拟身份验证用户?

来自分类Dev

如何在REPL中的Python中运行单元测试?

来自分类Dev

如何在Spring单元测试中快速模拟服务?

来自分类Dev

如何在Swift中设置单元测试?

来自分类Dev

如何在AngularJS单元测试中模拟诺言的结果?

来自分类Dev

如何在AngularJS单元测试中解决承诺

来自分类Dev

如何在Python单元测试中模拟类?

来自分类常见问题

如何在Django中跳过单元测试?

来自分类Dev

如何在AngularJS Karma单元测试中触发`$ on`事件?

来自分类Dev

如何在角度的单元测试中模拟警报

来自分类Dev

如何在ember.js中执行单元测试?

来自分类Dev

如何在Java中对ZipOutputStream进行单元测试

来自分类Dev

如何在CRM 2011开发中编写单元测试

来自分类Dev

如何在R中制作单元测试块?

来自分类Dev

如何在pycharm中运行App Engine单元测试?

来自分类Dev

如何在单元测试中模拟AngularFire 2服务?

来自分类Dev

如何在单元测试中模拟HttpRequest的UserAgent属性?

来自分类Dev

如何在MVP中对演示者进行单元测试

Related 相关文章

  1. 1

    如何在单元测试中禁止Spark日志记录?

  2. 2

    如何在仪器化的单元测试中使用文件

  3. 3

    如何在仪器化的单元测试中使用文件

  4. 4

    在单元测试中,如何在HttpClient中使用FakeItEasy?

  5. 5

    如何在Struts单元测试中测试文件上传?

  6. 6

    如何在Struts单元测试中测试文件上传?

  7. 7

    模拟 API 响应时如何在单元测试中使用 JSON 文件

  8. 8

    如何在Xcode 6.1.1中使用Google Test运行本地C ++单元测试

  9. 9

    如何在Yii2中使用Codeception运行单元测试?

  10. 10

    如何在单元测试中使用Moq在异步方法中返回传递的参数?

  11. 11

    如何在Angular 8中使用对象模拟数组数据以进行单元测试

  12. 12

    如何在 Laravel 5 中使用 Codeception 在单元测试中模拟身份验证用户?

  13. 13

    如何在REPL中的Python中运行单元测试?

  14. 14

    如何在Spring单元测试中快速模拟服务?

  15. 15

    如何在Swift中设置单元测试?

  16. 16

    如何在AngularJS单元测试中模拟诺言的结果?

  17. 17

    如何在AngularJS单元测试中解决承诺

  18. 18

    如何在Python单元测试中模拟类?

  19. 19

    如何在Django中跳过单元测试?

  20. 20

    如何在AngularJS Karma单元测试中触发`$ on`事件?

  21. 21

    如何在角度的单元测试中模拟警报

  22. 22

    如何在ember.js中执行单元测试?

  23. 23

    如何在Java中对ZipOutputStream进行单元测试

  24. 24

    如何在CRM 2011开发中编写单元测试

  25. 25

    如何在R中制作单元测试块?

  26. 26

    如何在pycharm中运行App Engine单元测试?

  27. 27

    如何在单元测试中模拟AngularFire 2服务?

  28. 28

    如何在单元测试中模拟HttpRequest的UserAgent属性?

  29. 29

    如何在MVP中对演示者进行单元测试

热门标签

归档