消費者アクターにKafkaトピックをサブスクライブさせ、消費者の外部でSparkStreamingを使用してさらに処理するためにデータをストリーミングしてもらいたいです。なぜ俳優?そのスーパーバイザー戦略は、Kafkaの障害を処理するための優れた方法であると読んだためです(たとえば、障害時に再起動します)。
私は2つのオプションを見つけました:
KafkaConsumer
クラス:そのpoll()
メソッドはを返しますMap[String, Object]
。とDStream
同じように返されるようにしKafkaUtils.createDirectStream
たいのですが、アクターの外部からストリームをフェッチする方法がわかりません。ActorHelper
特性を拡張し、actorStream()
この例に示すように使用します。この後者のオプションは、トピックへの接続ではなく、ソケットへの接続を表示します。誰かが私を正しい方向に向けることができますか?
Kafkaの失敗を処理するために、ApacheCuratorフレームワークと次の回避策を使用しました。
val client: CuratorFramework = ... // see docs
val zk: CuratorZookeeperClient = client.getZookeeperClient
/**
* This method returns false if kafka or zookeeper is down.
*/
def isKafkaAvailable:Boolean =
Try {
if (zk.isConnected) {
val xs = client.getChildren.forPath("/brokers/ids")
xs.size() > 0
}
else false
}.getOrElse(false)
Kafkaのトピックを消費するために、私はcom.softwaremill.reactivekafka
ライブラリを使用しました。例えば:
class KafkaConsumerActor extends Actor {
val kafka = new ReactiveKafka()
val config: ConsumerProperties[Array[Byte], Any] = ... // see docs
override def preStart(): Unit = {
super.preStart()
val publisher = kafka.consume(config)
Source.fromPublisher(publisher)
.map(handleKafkaRecord)
.to(Sink.ignore).run()
}
/**
* This method will be invoked when any kafka records will happen.
*/
def handleKafkaRecord(r: ConsumerRecord[Array[Byte], Any]) = {
// handle record
}
}
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加