:)
簡単に言うと、Kafkaからの新しいレコードを消費したくないという(奇妙な)状況に陥ったので、トピック内のすべてのパーティションのsparkStreaming消費(InputDStream [ConsumerRecord])を一時停止し、いくつかの操作を実行して、最後に、レコードの消費を再開します。
まず第一に...これは可能ですか?
私はこのようなsthを試してきました:
var consumer: KafkaConsumer[String, String] = _
consumer = new KafkaConsumer[String, String](properties)
consumer.subscribe(java.util.Arrays.asList(topicName))
consumer.pause(consumer.assignment())
...
consumer.resume(consumer.assignment())
しかし、私はこれを手に入れました:
println(s"Assigned partitions: $consumer.assignment()") --> []
println(s"Paused partitions: ${consumer.paused()}") --> []
println(s"Partitions for: ${consumer.partitionsFor(topicNAme)}") --> [Partition(topic=topicAAA, partition=0, leader=1, replicas=[1,2,3], partition=1, ... ]
何が欠けているのか、そして消費者がパーティションを割り当てていることが明らかなのになぜ空の結果が得られるのかを理解するための助けは歓迎されます!
バージョン:Kafka:0.10 Spark:2.3.0 Scala:2.11.8
はい、可能ですコードにチェックポインティングを追加し、永続ストレージ(ローカルディスク、S3、HDFS)パスを渡します
また、ジョブを開始/再開するたびに、チェックポインティングからのコンシューマーオフセットを含むKafkaコンシューマーグループ情報を取得し、停止した場所から処理を開始します。
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
Spark Check- = pointingは、オフセットを保存するだけでなく、ステージとジョブのDAGのシリアル化状態を保存するためのメカニズムです。したがって、新しいコードでジョブを再開するたびに、
ここで、ディスクからの読み取りは、SparkがKafkaオフセット、DAG、および古い不完全な処理済みデータをロードするために必要な1回限りの操作です。
完了すると、デフォルトまたは指定されたチェックポイント間隔で常にデータをディスクに保存し続けます。
SparkストリーミングにはKafkaグループIDを指定するオプションがありますが、Spark構造化ストリームにはありません。
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加