Spark Streaming用のMQTTシンクを作成するにはどうすればよいですか?

フェリペ

SparkStreaming用のMQTTソース[1] [2]を作成する方法の例がいくつかありますただし、print()メソッドを使用する代わりに結果を公開できるMQTTシンクを作成したいと思いますMqttSinkを1つ作成しようとしましたが、object not serializableエラー発生します。次に、このブログに基づいてコードsendを作成していMqttSinkますが、オブジェクトで作成したメソッド見つかりません

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{HashPartitioner, SparkConf}
import org.fusesource.mqtt.client.QoS
import org.sense.spark.util.{MqttSink, TaxiRideSource}

object TaxiRideCountCombineByKey {

  val mqttTopic: String = "spark-mqtt-sink"
  val qos: QoS = QoS.AT_LEAST_ONCE

  def main(args: Array[String]): Unit = {

    val outputMqtt: Boolean = if (args.length > 0 && args(0).equals("mqtt")) true else false

    // Create a local StreamingContext with two working thread and batch interval of 1 second.
    // The master requires 4 cores to prevent from a starvation scenario.
    val sparkConf = new SparkConf()
      .setAppName("TaxiRideCountCombineByKey")
      .setMaster("local[4]")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    val stream = ssc.receiverStream(new TaxiRideSource())
    val driverStream = stream.map(taxiRide => (taxiRide.driverId, 1))
    val countStream = driverStream.combineByKey(
      (v) => (v, 1), //createCombiner
      (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), //mergeValue
      (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2), // mergeCombiners
      new HashPartitioner(3)
    )

    if (outputMqtt) {
      println("Use the command below to consume data:")
      println("mosquitto_sub -h 127.0.0.1 -p 1883 -t " + mqttTopic)

      val mqttSink = ssc.sparkContext.broadcast(MqttSink)
      countStream.foreachRDD { rdd =>
        rdd.foreach { message =>
          mqttSink.value.send(mqttTopic, message.toString()) // "send" method does not exist
        }
      }
    } else {
      countStream.print()
    }

    ssc.start() // Start the computation
    ssc.awaitTermination() // Wait for the computation to terminate
  }
}
import org.fusesource.mqtt.client.{FutureConnection, MQTT, QoS}

class MqttSink(createProducer: () => FutureConnection) extends Serializable {
  lazy val producer = createProducer()
  def send(topic: String, message: String): Unit = {
    producer.publish(topic, message.toString().getBytes, QoS.AT_LEAST_ONCE, false)
  }
}

object MqttSink {
  def apply(): MqttSink = {
    val f = () => {
      val mqtt = new MQTT()
      mqtt.setHost("localhost", 1883)
      val producer = mqtt.futureConnection()
      producer.connect().await()
      sys.addShutdownHook {
        producer.disconnect().await()
      }
      producer
    }
    new MqttSink(f)
  }
}
フェリペ

これは、ブログエントリのSparkとKafkaの統合パターンに基づく実用的な例です

package org.sense.spark.app

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{HashPartitioner, SparkConf}
import org.fusesource.mqtt.client.QoS
import org.sense.spark.util.{MqttSink, TaxiRideSource}

object TaxiRideCountCombineByKey {

  val mqttTopic: String = "spark-mqtt-sink"
  val qos: QoS = QoS.AT_LEAST_ONCE

  def main(args: Array[String]): Unit = {

    val outputMqtt: Boolean = if (args.length > 0 && args(0).equals("mqtt")) true else false

    // Create a local StreamingContext with two working thread and batch interval of 1 second.
    // The master requires 4 cores to prevent from a starvation scenario.
    val sparkConf = new SparkConf()
      .setAppName("TaxiRideCountCombineByKey")
      .setMaster("local[4]")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    val stream = ssc.receiverStream(new TaxiRideSource())
    val driverStream = stream.map(taxiRide => (taxiRide.driverId, 1))
    val countStream = driverStream.combineByKey(
      (v) => (v, 1), //createCombiner
      (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), //mergeValue
      (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2), // mergeCombiners
      new HashPartitioner(3)
    )

    if (outputMqtt) {
      println("Use the command below to consume data:")
      println("mosquitto_sub -h 127.0.0.1 -p 1883 -t " + mqttTopic)

      val mqttSink = ssc.sparkContext.broadcast(MqttSink())
      countStream.foreachRDD { rdd =>
        rdd.foreach { message =>
          mqttSink.value.send(mqttTopic, message.toString()) // "send" method does not exist
        }
      }
    } else {
      countStream.print()
    }

    ssc.start() // Start the computation
    ssc.awaitTermination() // Wait for the computation to terminate
  }
}
package org.sense.spark.util

import org.fusesource.mqtt.client.{FutureConnection, MQTT, QoS}

class MqttSink(createProducer: () => FutureConnection) extends Serializable {

  lazy val producer = createProducer()

  def send(topic: String, message: String): Unit = {
    producer.publish(topic, message.toString().getBytes, QoS.AT_LEAST_ONCE, false)
  }
}

object MqttSink {
  def apply(): MqttSink = {
    val f = () => {
      val mqtt = new MQTT()
      mqtt.setHost("localhost", 1883)
      val producer = mqtt.futureConnection()
      producer.connect().await()
      sys.addShutdownHook {
        producer.disconnect().await()
      }
      producer
    }
    new MqttSink(f)
  }
}
package org.sense.spark.util

import java.io.{BufferedReader, FileInputStream, InputStreamReader}
import java.nio.charset.StandardCharsets
import java.util.Locale
import java.util.zip.GZIPInputStream

import org.apache.spark.storage._
import org.apache.spark.streaming.receiver._
import org.joda.time.DateTime
import org.joda.time.format.{DateTimeFormat, DateTimeFormatter}

case class TaxiRide(rideId: Long, isStart: Boolean, startTime: DateTime, endTime: DateTime,
                    startLon: Float, startLat: Float, endLon: Float, endLat: Float,
                    passengerCnt: Short, taxiId: Long, driverId: Long)

object TimeFormatter {
  val timeFormatter: DateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withLocale(Locale.US).withZoneUTC()
}

class TaxiRideSource extends Receiver[TaxiRide](StorageLevel.MEMORY_AND_DISK_2) {
  val dataFilePath = "/home/flink/nycTaxiRides.gz";
  var dataRateListener: DataRateListener = _

  /**
   * Start the thread that receives data over a connection
   */
  def onStart() {
    dataRateListener = new DataRateListener()
    dataRateListener.start()
    new Thread("TaxiRide Source") {
      override def run() {
        receive()
      }
    }.start()
  }

  def onStop() {}

  /**
   * Periodically generate a TaxiRide event and regulate the emission frequency
   */
  private def receive() {
    while (!isStopped()) {
      val gzipStream = new GZIPInputStream(new FileInputStream(dataFilePath))
      val reader: BufferedReader = new BufferedReader(new InputStreamReader(gzipStream, StandardCharsets.UTF_8))
      try {
        var line: String = null
        do {
          // start time before reading the line
          val startTime = System.nanoTime

          // read the line on the file and yield the object
          line = reader.readLine
          if (line != null) {
            val taxiRide: TaxiRide = getTaxiRideFromString(line)
            store(taxiRide)
          }

          // regulate frequency of the source
          dataRateListener.busySleep(startTime)
        } while (line != null)
      } finally {
        reader.close
      }
    }
  }

  def getTaxiRideFromString(line: String): TaxiRide = {
    // println(line)
    val tokens: Array[String] = line.split(",")
    if (tokens.length != 11) {
      throw new RuntimeException("Invalid record: " + line)
    }

    val rideId: Long = tokens(0).toLong
    val (isStart, startTime, endTime) = tokens(1) match {
      case "START" => (true, DateTime.parse(tokens(2), TimeFormatter.timeFormatter), DateTime.parse(tokens(3), TimeFormatter.timeFormatter))
      case "END" => (false, DateTime.parse(tokens(2), TimeFormatter.timeFormatter), DateTime.parse(tokens(3), TimeFormatter.timeFormatter))
      case _ => throw new RuntimeException("Invalid record: " + line)
    }
    val startLon: Float = if (tokens(4).length > 0) tokens(4).toFloat else 0.0f
    val startLat: Float = if (tokens(5).length > 0) tokens(5).toFloat else 0.0f
    val endLon: Float = if (tokens(6).length > 0) tokens(6).toFloat else 0.0f
    val endLat: Float = if (tokens(7).length > 0) tokens(7).toFloat else 0.0f
    val passengerCnt: Short = tokens(8).toShort
    val taxiId: Long = tokens(9).toLong
    val driverId: Long = tokens(10).toLong

    TaxiRide(rideId, isStart, startTime, endTime, startLon, startLat, endLon, endLat, passengerCnt, taxiId, driverId)
  }
}

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

Spark AR(Facebook用)でボタンを作成して使用するにはどうすればよいですか?

分類Dev

Spark Streamingアプリケーションの統計エンドポイントにアクセスするにはどうすればよいですか?

分類Dev

spark-dataframesタイプのDataset <Row>の配列またはコレクションを作成するにはどうすればよいですか?

分類Dev

Sparkテーブルでインデックスを作成するにはどうすればよいですか?

分類Dev

Sparkを使用してWHEREINで長いクエリを作成するにはどうすればよいですか?

分類Dev

Sparkで「and」を使用してsqlselectに空の条件を適用するにはどうすればよいですか?

分類Dev

Spark Structured Streamingによって作成された古いデータを削除するにはどうすればよいですか?

分類Dev

Spark Streamingでタプルを処理するにはどうすればよいですか?

分類Dev

Spark Structured Streamingでバッチ間隔を指定するにはどうすればよいですか?

分類Dev

Sparkストリーミングで停止条件を作成するにはどうすればよいですか?

分類Dev

Spark 2.1.0 で適合した PipelineModelS の配列を作成するにはどうすればよいですか?

分類Dev

SparkのrepartitionAndSortWithinPartitionsを使用するにはどうすればよいですか?

分類Dev

Spark Javaアプリケーション用に自動的に定義されたポートを取得するにはどうすればよいですか?

分類Dev

Spark Structured Streamingでオフセットを管理するにはどうすればよいですか?(_spark_metadataの問題)

分類Dev

Scala / Sparkで複数のDataFrameから複数のシートを使用してExcelファイルを作成するにはどうすればよいですか?

分類Dev

Spark Dataframeで列のコンテンツ全体を表示するにはどうすればよいですか?

分類Dev

SparkアプリケーションでDataFrame(Scala)からCSVファイルを作成するにはどうすればよいですか?

分類Dev

Spark ORCインデックスを使用するにはどうすればよいですか?

分類Dev

Sparkに参加してネストされた列を作成するにはどうすればよいですか?

分類Dev

Spark Shellの2つのバージョンを使用するにはどうすればよいですか?

分類Dev

Spark Scalaで主キーを自動的に作成するにはどうすればよいですか?

分類Dev

Spark:列の配列をクエリするにはどうすればよいですか?

分類Dev

パンダにSparkクラスターを使用させるにはどうすればよいですか

分類Dev

KubernetesからSparkシェルにアクセスするにはどうすればよいですか?

分類Dev

Spark Structured Streamingで静的データフレームをストリーミングフレームと比較するにはどうすればよいですか?

分類Dev

Spark構造化ストリーミングを使用する場合、Spark Streamingのように、現在のバッチの集計結果を取得するにはどうすればよいですか?

分類Dev

pysparkとSparkSQLを作成してHiveon Sparkを実行するにはどうすればよいですか?

分類Dev

Sparkでフィルタリングを使用してノードのペアを作成するにはどうすればよいですか?

分類Dev

単一のSparkアプリケーションでより多くのStreamingContextを使用するにはどうすればよいですか?

Related 関連記事

  1. 1

    Spark AR(Facebook用)でボタンを作成して使用するにはどうすればよいですか?

  2. 2

    Spark Streamingアプリケーションの統計エンドポイントにアクセスするにはどうすればよいですか?

  3. 3

    spark-dataframesタイプのDataset <Row>の配列またはコレクションを作成するにはどうすればよいですか?

  4. 4

    Sparkテーブルでインデックスを作成するにはどうすればよいですか?

  5. 5

    Sparkを使用してWHEREINで長いクエリを作成するにはどうすればよいですか?

  6. 6

    Sparkで「and」を使用してsqlselectに空の条件を適用するにはどうすればよいですか?

  7. 7

    Spark Structured Streamingによって作成された古いデータを削除するにはどうすればよいですか?

  8. 8

    Spark Streamingでタプルを処理するにはどうすればよいですか?

  9. 9

    Spark Structured Streamingでバッチ間隔を指定するにはどうすればよいですか?

  10. 10

    Sparkストリーミングで停止条件を作成するにはどうすればよいですか?

  11. 11

    Spark 2.1.0 で適合した PipelineModelS の配列を作成するにはどうすればよいですか?

  12. 12

    SparkのrepartitionAndSortWithinPartitionsを使用するにはどうすればよいですか?

  13. 13

    Spark Javaアプリケーション用に自動的に定義されたポートを取得するにはどうすればよいですか?

  14. 14

    Spark Structured Streamingでオフセットを管理するにはどうすればよいですか?(_spark_metadataの問題)

  15. 15

    Scala / Sparkで複数のDataFrameから複数のシートを使用してExcelファイルを作成するにはどうすればよいですか?

  16. 16

    Spark Dataframeで列のコンテンツ全体を表示するにはどうすればよいですか?

  17. 17

    SparkアプリケーションでDataFrame(Scala)からCSVファイルを作成するにはどうすればよいですか?

  18. 18

    Spark ORCインデックスを使用するにはどうすればよいですか?

  19. 19

    Sparkに参加してネストされた列を作成するにはどうすればよいですか?

  20. 20

    Spark Shellの2つのバージョンを使用するにはどうすればよいですか?

  21. 21

    Spark Scalaで主キーを自動的に作成するにはどうすればよいですか?

  22. 22

    Spark:列の配列をクエリするにはどうすればよいですか?

  23. 23

    パンダにSparkクラスターを使用させるにはどうすればよいですか

  24. 24

    KubernetesからSparkシェルにアクセスするにはどうすればよいですか?

  25. 25

    Spark Structured Streamingで静的データフレームをストリーミングフレームと比較するにはどうすればよいですか?

  26. 26

    Spark構造化ストリーミングを使用する場合、Spark Streamingのように、現在のバッチの集計結果を取得するにはどうすればよいですか?

  27. 27

    pysparkとSparkSQLを作成してHiveon Sparkを実行するにはどうすればよいですか?

  28. 28

    Sparkでフィルタリングを使用してノードのペアを作成するにはどうすればよいですか?

  29. 29

    単一のSparkアプリケーションでより多くのStreamingContextを使用するにはどうすればよいですか?

ホットタグ

アーカイブ