Spark-アクターからのストリーミング

ウィップマン

消費者アクターにKafkaトピックをサブスクライブさせ、消費者の外部でSparkStreamingを使用してさらに処理するためにデータをストリーミングしてもらいたいです。なぜ俳優?そのスーパーバイザー戦略は、Kafkaの障害を処理するための優れた方法であると読んだためです(たとえば、障害時に再起動します)。

私は2つのオプションを見つけました:

  • JavaKafkaConsumerクラス:そのpoll()メソッドはを返しますMap[String, Object]DStream同じように返されるようにしKafkaUtils.createDirectStreamたいのですが、アクターの外部からストリームをフェッチする方法がわかりません。
  • ActorHelper特性を拡張し、actorStream()この例に示すように使用しますこの後者のオプションは、トピックへの接続ではなく、ソケットへの接続を表示します。

誰かが私を正しい方向に向けることができますか?

ジョン・マリンズ

Kafkaの失敗を処理するために、ApacheCuratorフレームワークと次の回避策を使用しました。

val client: CuratorFramework = ... // see docs
val zk: CuratorZookeeperClient = client.getZookeeperClient

/**
  * This method returns false if kafka or zookeeper is down.
  */ 
def isKafkaAvailable:Boolean = 
   Try {
      if (zk.isConnected) {
        val xs = client.getChildren.forPath("/brokers/ids")
        xs.size() > 0
      }
      else false
    }.getOrElse(false)

Kafkaのトピックを消費するために、私はcom.softwaremill.reactivekafkaライブラリを使用しました。例えば:

class KafkaConsumerActor extends Actor {
   val kafka = new ReactiveKafka()
   val config: ConsumerProperties[Array[Byte], Any] = ... // see docs

   override def preStart(): Unit = {
      super.preStart()

      val publisher = kafka.consume(config)
      Source.fromPublisher(publisher)
            .map(handleKafkaRecord)
            .to(Sink.ignore).run()
   }

   /**
     * This method will be invoked when any kafka records will happen.
     */
   def handleKafkaRecord(r: ConsumerRecord[Array[Byte], Any]) = {
      // handle record
   }
}

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

Sparkシェル(pyspark)からのSparkストリーミングアプリケーションのクエリ

分類Dev

KafkaからのSparkストリーミングのNULL値

分類Dev

SparkストリーミングからのKafkaSSLクライアントトラストストアファイルの読み取りエラー

分類Dev

クエリ例外から回復するSpark構造化ストリーミング

分類Dev

Sparkストリーミング+ Spark SQL

分類Dev

Kafkaトピック内の特定のパーティションからSparkを使用してデータをストリーミングする

分類Dev

Kafka コネクターによる Spark ストリーミングの停止

分類Dev

Spark ストリーミング アクションの順次実行

分類Dev

複数のKafkaトピックからのSpark構造化ストリーミングアプリの読み取り

分類Dev

SparkストリーミングとSparkアプリケーションを同じYARNクラスター内で実行できますか?

分類Dev

Sparkストリーミングのカスタム指標

分類Dev

kafkaで他のVMからSparkストリーミングを使用する方法

分類Dev

Spark構造化ストリーミングクエリの例外

分類Dev

Spark構造化ストリーミング:複数のシンク

分類Dev

Spark構造化ストリーミングチェックポイントのクリーンアップ

分類Dev

Sparkストリーミング:アプリケーションの状態

分類Dev

SparkストリーミングアプリケーションのOOM例外

分類Dev

Spark ストリーミングに Kafka/Flume の統合が必要なのはなぜですか? Sparkストリーミングはソースから直接データを消費できませんか?

分類Dev

Coarse GrainMesosクラスターのKafkaレシーバーを介したSparkストリーミング

分類Dev

SparkストリーミングアプリケーションでKafkaからJavaオブジェクトを受け取る方法

分類Dev

SparkストリーミングkafkaconcurrentModificationException

分類Dev

Sparkストリーミングデータからキャッシュを遅延構築する方法

分類Dev

Javaを使用してSparkストリーミングからcassandraにデータを保存する方法は?

分類Dev

Sparkストリーミングデータフレームの永続化操作

分類Dev

Spark構造化ストリーミングジョブからの書き込み中のデルタテーブルのバージョン管理

分類Dev

Sparkストリーミング/構造化ストリーミングでKafkaからのavroメッセージを読む

分類Dev

Sparkストリーミングで2つのキーを減らす方法は?

分類Dev

SPARKストリーミングでメモリの問題にぶつかるピクルスオブジェクト(モデルデータ)

分類Dev

KafkaおよびTextSocketStreamでのSparkストリーミングデータの配布

Related 関連記事

  1. 1

    Sparkシェル(pyspark)からのSparkストリーミングアプリケーションのクエリ

  2. 2

    KafkaからのSparkストリーミングのNULL値

  3. 3

    SparkストリーミングからのKafkaSSLクライアントトラストストアファイルの読み取りエラー

  4. 4

    クエリ例外から回復するSpark構造化ストリーミング

  5. 5

    Sparkストリーミング+ Spark SQL

  6. 6

    Kafkaトピック内の特定のパーティションからSparkを使用してデータをストリーミングする

  7. 7

    Kafka コネクターによる Spark ストリーミングの停止

  8. 8

    Spark ストリーミング アクションの順次実行

  9. 9

    複数のKafkaトピックからのSpark構造化ストリーミングアプリの読み取り

  10. 10

    SparkストリーミングとSparkアプリケーションを同じYARNクラスター内で実行できますか?

  11. 11

    Sparkストリーミングのカスタム指標

  12. 12

    kafkaで他のVMからSparkストリーミングを使用する方法

  13. 13

    Spark構造化ストリーミングクエリの例外

  14. 14

    Spark構造化ストリーミング:複数のシンク

  15. 15

    Spark構造化ストリーミングチェックポイントのクリーンアップ

  16. 16

    Sparkストリーミング:アプリケーションの状態

  17. 17

    SparkストリーミングアプリケーションのOOM例外

  18. 18

    Spark ストリーミングに Kafka/Flume の統合が必要なのはなぜですか? Sparkストリーミングはソースから直接データを消費できませんか?

  19. 19

    Coarse GrainMesosクラスターのKafkaレシーバーを介したSparkストリーミング

  20. 20

    SparkストリーミングアプリケーションでKafkaからJavaオブジェクトを受け取る方法

  21. 21

    SparkストリーミングkafkaconcurrentModificationException

  22. 22

    Sparkストリーミングデータからキャッシュを遅延構築する方法

  23. 23

    Javaを使用してSparkストリーミングからcassandraにデータを保存する方法は?

  24. 24

    Sparkストリーミングデータフレームの永続化操作

  25. 25

    Spark構造化ストリーミングジョブからの書き込み中のデルタテーブルのバージョン管理

  26. 26

    Sparkストリーミング/構造化ストリーミングでKafkaからのavroメッセージを読む

  27. 27

    Sparkストリーミングで2つのキーを減らす方法は?

  28. 28

    SPARKストリーミングでメモリの問題にぶつかるピクルスオブジェクト(モデルデータ)

  29. 29

    KafkaおよびTextSocketStreamでのSparkストリーミングデータの配布

ホットタグ

アーカイブ