我是 Spark 的 Streaming 框架的新手,正在尝试处理 twitter 流。我正在为相同的测试用例编写测试用例,并且知道我可以使用 Spark StreamingSuiteBase,这将帮助我将输入作为流测试我的函数。但是我写了一个函数,它以 DStream[Status] 作为输入,处理后给出 DStream[String] 作为输出。我从 StreamingSuiteBase 使用的 api 是 testOperation。
test("Filter only words Starting with #") {
val inputTweet = List(List("this is #firstHash"), List("this is #secondHash"), List("this is #thirdHash"))
val expected = List(List("#firstHash"), List("#secondHash"), List("#thirdHash"))
testOperation(inputTweet, TransformTweets.getText _, expected, ordered = false)
这是发送输入的函数..
def getText(englishTweets: DStream[Status]): DStream[String] = {
println(englishTweets.toString)
val hashTags = englishTweets.flatMap(x => x.getText.split(" ").filter(_.startsWith("#")))
hashTags
}
但是由于 DStream[Status] 和 DStream[String],我收到错误“类型不匹配”。我如何模拟 Stream[Status]。
所以,我从“获取Twitter状态解决这个问题createStatus
”的API TwitterObjectFactory
。没有必要嘲笑TwitterStatus
。即使您设法模拟它,也存在序列化问题。所以,这是最好的解决方案:
val rawJson = Source.fromURL(getClass.getResource("/tweetStatus.json")).getLines.mkString
val tweetStatus = TwitterObjectFactory.createStatus(rawJson)
希望这有助于某人!
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句