SparkStreamingでKafkaConsumerを一時停止して再開します

ボルハ

:)

簡単に言うと、Kafkaからの新しいレコードを消費したくないという(奇妙な)状況に陥ったので、トピック内のすべてのパーティションのsparkStreaming消費(InputDStream [ConsumerRecord])を一時停止しいくつかの操作を実行して、最後に、レコードの消費を再開します。

まず第一に...これは可能ですか?

私はこのようなsthを試してきました:

var consumer: KafkaConsumer[String, String] = _    
consumer = new KafkaConsumer[String, String](properties)    
consumer.subscribe(java.util.Arrays.asList(topicName))

consumer.pause(consumer.assignment())
...
consumer.resume(consumer.assignment())

しかし、私はこれを手に入れました:

println(s"Assigned partitions: $consumer.assignment()") --> []
println(s"Paused partitions: ${consumer.paused()}") --> []
println(s"Partitions for: ${consumer.partitionsFor(topicNAme)}") --> [Partition(topic=topicAAA, partition=0, leader=1, replicas=[1,2,3], partition=1, ... ]

何が欠けているのか、そして消費者がパーティションを割り当てていることが明らかなのになぜ空の結果が得られるのかを理解するための助けは歓迎されます!

バージョン:Kafka:0.10 Spark:2.3.0 Scala:2.11.8

クイックシルバー

はい、可能ですコードにチェックポインティングを追加し、永続ストレージ(ローカルディスク、S3、HDFS)パスを渡します

また、ジョブを開始/再開するたびに、チェックポインティングからのコンシューマーオフセットを含むKafkaコンシューマーグループ情報を取得し、停止した場所から処理を開始します。

val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

Spark Check- = pointingは、オフセットを保存するだけでなく、ステージとジョブのDAGのシリアル化状態を保存するためのメカニズムです。したがって、新しいコードでジョブを再開するたびに、

  1. シリアル化されたデータを読み取って処理する
  2. Sparkアプリにコードの変更がある場合は、キャッシュされたDAGステージをクリーンアップします
  3. 最新のコードを使用して、新しいデータから処理を再開します。

ここで、ディスクからの読み取りは、SparkがKafkaオフセット、DAG、および古い不完全な処理済みデータをロードするために必要な1回限りの操作です。

完了すると、デフォルトまたは指定されたチェックポイント間隔で常にデータをディスクに保存し続けます。

SparkストリーミングにはKafkaグループIDを指定するオプションがありますが、Spark構造化ストリームにはありません。

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

setIntervalを一時停止して再開します

分類Dev

forループを一時停止して再開します

分類Dev

Swift:NSTimerを一時停止して再開します

分類Dev

ストリームを一時停止して再開します

分類Dev

Pythonでスレッドを一時停止して再開します

分類Dev

JavaScriptでsetIntervalを一時停止および再開します

分類Dev

他のブランチで作業している間、gitrebaseを一時停止して再開します

分類Dev

Javaスレッドを停止、中断、一時停止、再開します

分類Dev

脊椎アニメーションを一時停止して再開します

分類Dev

アプリケーションを一時停止して再開します

分類Dev

addEventListenerが一時停止して再開しない-Androidで電話または通知を受信した後、HTML5オーディオを再開します

分類Dev

Rでのキャレットトレーニングを一時停止して再開します

分類Dev

LinuxでPythonを使用してオーディオを一時停止または再開する

分類Dev

スレッドを一時停止/再開します

分類Dev

jmeterの実行を一時停止および再開します

分類Dev

Seleniumの実行を一時停止および再開します

分類Dev

forループを一時停止し、進行状況を保存して、もう一度再開しますか?

分類Dev

一時停止する前にsystemdサービスを停止し、再開後に再開します

分類Dev

タイマーを「一時停止」して再開しようとしています

分類Dev

Excelの読み取りを一時停止し、ボタンをクリックして再開します

分類Dev

一時停止、再開時に最後に一時停止した値を指定します

分類Dev

Bashでループを停止して再開します

分類Dev

UIViewPropertyAnimatorを使用して制約アニメーションを一時停止および再開します

分類Dev

Czを使用してコンソールemacsを一時停止/再開しますか?

分類Dev

ターミナルを閉じるときにwgetを一時停止して再開します

分類Dev

ExoPlayer 2の再生を一時停止して再開する方法(PlayerControlは削除されました)

分類Dev

ExoPlayer 2の再生を一時停止して再開する方法(PlayerControlは削除されました)

分類Dev

ExoPlayer 2の再生を一時停止して再開する方法(PlayerControlは削除されました)

分類Dev

スレッドアクティビティを一時停止して再開します

Related 関連記事

  1. 1

    setIntervalを一時停止して再開します

  2. 2

    forループを一時停止して再開します

  3. 3

    Swift:NSTimerを一時停止して再開します

  4. 4

    ストリームを一時停止して再開します

  5. 5

    Pythonでスレッドを一時停止して再開します

  6. 6

    JavaScriptでsetIntervalを一時停止および再開します

  7. 7

    他のブランチで作業している間、gitrebaseを一時停止して再開します

  8. 8

    Javaスレッドを停止、中断、一時停止、再開します

  9. 9

    脊椎アニメーションを一時停止して再開します

  10. 10

    アプリケーションを一時停止して再開します

  11. 11

    addEventListenerが一時停止して再開しない-Androidで電話または通知を受信した後、HTML5オーディオを再開します

  12. 12

    Rでのキャレットトレーニングを一時停止して再開します

  13. 13

    LinuxでPythonを使用してオーディオを一時停止または再開する

  14. 14

    スレッドを一時停止/再開します

  15. 15

    jmeterの実行を一時停止および再開します

  16. 16

    Seleniumの実行を一時停止および再開します

  17. 17

    forループを一時停止し、進行状況を保存して、もう一度再開しますか?

  18. 18

    一時停止する前にsystemdサービスを停止し、再開後に再開します

  19. 19

    タイマーを「一時停止」して再開しようとしています

  20. 20

    Excelの読み取りを一時停止し、ボタンをクリックして再開します

  21. 21

    一時停止、再開時に最後に一時停止した値を指定します

  22. 22

    Bashでループを停止して再開します

  23. 23

    UIViewPropertyAnimatorを使用して制約アニメーションを一時停止および再開します

  24. 24

    Czを使用してコンソールemacsを一時停止/再開しますか?

  25. 25

    ターミナルを閉じるときにwgetを一時停止して再開します

  26. 26

    ExoPlayer 2の再生を一時停止して再開する方法(PlayerControlは削除されました)

  27. 27

    ExoPlayer 2の再生を一時停止して再開する方法(PlayerControlは削除されました)

  28. 28

    ExoPlayer 2の再生を一時停止して再開する方法(PlayerControlは削除されました)

  29. 29

    スレッドアクティビティを一時停止して再開します

ホットタグ

アーカイブ