Sparkストリーミングを使用してCassandraから読み取るときに問題が発生します。
上記のリンクとして、私は使用します
val rdd = ssc.cassandraTable("streaming_test", "key_value").select("key", "value").where("fu = ?", 3)
cassandraからデータを選択しますが、Sparkストリーミングにはクエリが1回しかないようですが、10センコンド間隔を使用してクエリを続行したいと思います。
私のコードは次のとおりです、あなたの応答を望みます。
ありがとう!
import org.apache.spark._
import org.apache.spark.streaming._
import com.datastax.spark.connector.streaming._
import org.apache.spark.rdd._
import scala.collection.mutable.Queue
object SimpleApp {
def main(args: Array[String]){
val conf = new SparkConf().setAppName("scala_streaming_test").set("spark.cassandra.connection.host", "127.0.0.1")
val ssc = new StreamingContext(conf, Seconds(10))
val rdd = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu")
//rdd.collect().foreach(println)
val rddQueue = new Queue[RDD[com.datastax.spark.connector.CassandraRow]]()
val dstream = ssc.queueStream(rddQueue)
dstream.print()
ssc.start()
rdd.collect().foreach(println)
rddQueue += rdd
ssc.awaitTermination()
}
}
CassandraRDDを入力としてConstantInputDStreamを作成できます。ConstantInputDStreamは、各ストリーミング間隔で同じRDDを提供し、そのRDDでアクションを実行することにより、RDD系統の実体化をトリガーし、毎回Cassandraでクエリを実行します。
クエリ時間が長くなり、ストリーミングプロセスが不安定になるのを防ぐために、クエリ対象のデータが無制限に大きくならないようにしてください。
このような何かがトリックを行うはずです(開始点としてコードを使用して):
import org.apache.spark.streaming.dstream.ConstantInputDStream
val ssc = new StreamingContext(conf, Seconds(10))
val cassandraRDD = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu")
val dstream = new ConstantInputDStream(ssc, cassandraRDD)
dstream.foreachRDD{ rdd =>
// any action will trigger the underlying cassandra query, using collect to have a simple output
println(rdd.collect.mkString("\n"))
}
ssc.start()
ssc.awaitTermination()
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加