SparkStreamingを使用したCassandraからの読み取り

ヤオユウ

Sparkストリーミングを使用してCassandraから読み取るときに問題が発生します。

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md#reading-from-cassandra-from-the-streamingcontext

上記のリンクとして、私は使用します

val rdd = ssc.cassandraTable("streaming_test", "key_value").select("key", "value").where("fu = ?", 3)

cassandraからデータを選択しますが、Sparkストリーミングにはクエリが1回しかないようですが、10センコンド間隔を使用してクエリを続行したいと思います。

私のコードは次のとおりです、あなたの応答を望みます。

ありがとう!

import org.apache.spark._
import org.apache.spark.streaming._
import com.datastax.spark.connector.streaming._
import org.apache.spark.rdd._
import scala.collection.mutable.Queue


object SimpleApp {
def main(args: Array[String]){
    val conf = new SparkConf().setAppName("scala_streaming_test").set("spark.cassandra.connection.host", "127.0.0.1")

    val ssc = new StreamingContext(conf, Seconds(10))

    val rdd = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu")

    //rdd.collect().foreach(println)

    val rddQueue = new Queue[RDD[com.datastax.spark.connector.CassandraRow]]()


    val dstream = ssc.queueStream(rddQueue)

    dstream.print()

    ssc.start()
    rdd.collect().foreach(println)
    rddQueue += rdd
    ssc.awaitTermination()
}  

}

maasg

CassandraRDDを入力としてConstantInputDStreamを作成できます。ConstantInputDStreamは、各ストリーミング間隔で同じRDDを提供し、そのRDDでアクションを実行することにより、RDD系統の実体化をトリガーし、毎回Cassandraでクエリを実行します。

クエリ時間が長くなり、ストリーミングプロセスが不安定になるのを防ぐために、クエリ対象のデータが無制限に大きくならないようにしてください。

このような何かがトリックを行うはずです(開始点としてコードを使用して):

import org.apache.spark.streaming.dstream.ConstantInputDStream

val ssc = new StreamingContext(conf, Seconds(10))

val cassandraRDD = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu")

val dstream = new ConstantInputDStream(ssc, cassandraRDD)

dstream.foreachRDD{ rdd => 
    // any action will trigger the underlying cassandra query, using collect to have a simple output
    println(rdd.collect.mkString("\n")) 
}
ssc.start()
ssc.awaitTermination()

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

VB.NETを使用したAPIからの読み取り

分類Dev

変数pythonを使用したmysqlからの読み取り/保存

分類Dev

任意の区切り文字を使用したFileStreamからの読み取り

分類Dev

GridFを使用したmongoDBからの画像の読み取りと表示

分類Dev

Akka Streams2.4.2とSlick3.0を使用したpostgresからの読み取り

分類Dev

PHPを使用したデータベースからの読み取り

分類Dev

LINQを使用したXMLファイルからの読み取り

分類Dev

JQueryを使用したDIVランダムデータからの読み取り

分類Dev

動的割り当てを使用したCを使用したCSVからの値の読み取りと保存

分類Dev

datastaxcassandraを使用したCassandra読み取り操作エラー

分類Dev

Javaを使用してcassandraからデータを読み取る

分類Dev

C#を使用したファイルの読み取り、書き込み、ファイルからの読み取り、ファイルへの書き込み

分類Dev

Datastax spark-cassandra-connectorを使用してCassandraテーブルから特定の列を読み取る方法は?

分類Dev

InputStreamからの読み取りとFileOutputStreamを使用した書き込みの高速化

分類Dev

FTPからの画像の読み取りでの使用の最後に「例外をスローしました」

分類Dev

Ajaxを使用してList <long>から値を読み取り、別のHTML要素に割り当てたい

分類Dev

Pythonを使用したUnixソケット接続からの読み取りと書き込み

分類Dev

openFileOutputを使用したテキストファイルからの読み取り/書き込み

分類Dev

Spark:Cassandraから毎日読み取り、寄木細工の床に保存します。新しい行のみを読み取る方法は?

分類Dev

std :: cin関数を使用した後のファイルからの読み取りを修正する方法

分類Dev

READ / PARTまたはREAD / SEEKを使用したURLからの部分的な読み取り

分類Dev

Cの文字配列からの文字を使用したマルチスレッドの読み取り/実行

分類Dev

タスクを使用したFirestoreからのドキュメント参照の複数の読み取り

分類Dev

Cassandraからデータを並行して読み取るための最良の方法は何ですか?

分類Dev

CassandraからのDataflow読み取り並列処理の低下を防ぐ方法

分類Dev

ファイルから読み取りたい変数の名前として配列要素を使用します

分類Dev

10進数を使用したCSVからのデータ読み取りの潜在的な損失

分類Dev

表形式のSQLクエリを使用してドライブパスからXMLを読み取りたい

分類Dev

jdbcからPLSQLを実行した後のDBMS_Ouptutの読み取り

Related 関連記事

  1. 1

    VB.NETを使用したAPIからの読み取り

  2. 2

    変数pythonを使用したmysqlからの読み取り/保存

  3. 3

    任意の区切り文字を使用したFileStreamからの読み取り

  4. 4

    GridFを使用したmongoDBからの画像の読み取りと表示

  5. 5

    Akka Streams2.4.2とSlick3.0を使用したpostgresからの読み取り

  6. 6

    PHPを使用したデータベースからの読み取り

  7. 7

    LINQを使用したXMLファイルからの読み取り

  8. 8

    JQueryを使用したDIVランダムデータからの読み取り

  9. 9

    動的割り当てを使用したCを使用したCSVからの値の読み取りと保存

  10. 10

    datastaxcassandraを使用したCassandra読み取り操作エラー

  11. 11

    Javaを使用してcassandraからデータを読み取る

  12. 12

    C#を使用したファイルの読み取り、書き込み、ファイルからの読み取り、ファイルへの書き込み

  13. 13

    Datastax spark-cassandra-connectorを使用してCassandraテーブルから特定の列を読み取る方法は?

  14. 14

    InputStreamからの読み取りとFileOutputStreamを使用した書き込みの高速化

  15. 15

    FTPからの画像の読み取りでの使用の最後に「例外をスローしました」

  16. 16

    Ajaxを使用してList <long>から値を読み取り、別のHTML要素に割り当てたい

  17. 17

    Pythonを使用したUnixソケット接続からの読み取りと書き込み

  18. 18

    openFileOutputを使用したテキストファイルからの読み取り/書き込み

  19. 19

    Spark:Cassandraから毎日読み取り、寄木細工の床に保存します。新しい行のみを読み取る方法は?

  20. 20

    std :: cin関数を使用した後のファイルからの読み取りを修正する方法

  21. 21

    READ / PARTまたはREAD / SEEKを使用したURLからの部分的な読み取り

  22. 22

    Cの文字配列からの文字を使用したマルチスレッドの読み取り/実行

  23. 23

    タスクを使用したFirestoreからのドキュメント参照の複数の読み取り

  24. 24

    Cassandraからデータを並行して読み取るための最良の方法は何ですか?

  25. 25

    CassandraからのDataflow読み取り並列処理の低下を防ぐ方法

  26. 26

    ファイルから読み取りたい変数の名前として配列要素を使用します

  27. 27

    10進数を使用したCSVからのデータ読み取りの潜在的な損失

  28. 28

    表形式のSQLクエリを使用してドライブパスからXMLを読み取りたい

  29. 29

    jdbcからPLSQLを実行した後のDBMS_Ouptutの読み取り

ホットタグ

アーカイブ