遅延を伴うKafkaへのSparkStreaming書き込み-x分後

ジェンソンエフライム

SparkStreamingアプリケーションがあります。アーキテクチャは次のとおりです

キネシスからスパーク、カフカへ。

Sparkアプリケーションは、Kinesisからの構造化ストリーミングにqubole / kinesis-sqlを使用しています。次に、データが集約され、Kafkaにプッシュされます。

私たちのユースケースでは、Kafkaにプッシュする前に4分の遅延が必要です。

ウィンドウ処理は2分と4分の透かしで行われます

val windowedCountsDF = messageDS
   .withWatermark("timestamp", "4 minutes")
   .groupBy(window($"timestamp", "2 minutes", "2 minutes"), $"id", $"eventType", $"topic")

Kafkaへの書き込みは2分ごとにトリガーされます

val eventFilteredQuery = windowedCountsDF
  .selectExpr("topic", "id as key", "to_json(struct(*)) AS value")
  .writeStream
  .trigger(Trigger.ProcessingTime("2 minutes"))
  .format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
  .option("checkpointLocation", checkPoint)
  .outputMode("update")
  .option("kafka.bootstrap.servers", kafkaBootstrapServers)
  .queryName("events_kafka_stream")
  .start()

ウィンドウに合わせてトリガー時間を変更できますが、それでも一部のイベントは即座にkafkaにプッシュされます。

ウィンドウが完了してからx分後にKafkaへの書き込みを遅らせる方法はありますか?

ありがとう

シャイド

出力モードをupdateからappend(デフォルトオプション)に変更しますこのoutputモードでは、更新されたすべての行がシンクに書き込まれるため、透かしを使用するかどうかは関係ありません。

ただし、このappendモードでは、透かしが交差するまで書き込みを待機する必要があります。これはまさに必要なことです。

追加モードでは、透かしを使用して古い集計状態を削除します。ただし、ウィンドウ化された集計の出力はwithWatermark()、モードのセマンティクスで指定された遅延しきい値で遅延します。行は、ファイナライズされた後(つまり、透かしを超えた後)に1回だけ結果テーブルに追加できます。

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

Tor Browserを更新した後、15分の遅延後にのみ開きます

分類Dev

`pv`を使用したディスクへの書き込みは、最初は速く、最後は遅いようです。

分類Dev

JQuery : 一定の遅延後に「読み込み中」を表示

分類Dev

最後のX分を取得する方法-Angularjs

分類Dev

HttpClient postAbsVert.xで2分後に応答を取得する

分類Dev

遅延読み込みの例外を伴うWildflyとMybatis

分類Dev

ディスクへのデータの書き込みを遅らせる背後にある哲学は何ですか?

分類Dev

DB書き込みはSparkStreamingで遅延実行されます

分類Dev

h5pyを使用したhdf5への増分書き込み

分類Dev

.datファイルへの書き込み、番号の増分

分類Dev

ソケットへの書き込み間の遅延の作成

分類Dev

JPA Hibernate、結合後の遅延読み込み

分類Dev

Pythonでのファイルへの「遅延」書き込み

分類Dev

char *に変換した後のstd :: atomicへの書き込み

分類Dev

Swift-後でnilにすることができるプロパティの遅延読み込み

分類Dev

画像の遅延読み込み後に画像が壊れた場合、画像のデフォルトを設定します。どうすればいいですか?

分類Dev

Angular7遅延読み込みを使用した後にバインドできません

分類Dev

ストリームAPIとPrintWriterを使用して、ファイルへの書き込み時に最後の改行を避けるためにどのように?

分類Dev

投稿、書き込み、削除の後に取得フェッチを行う方法は?

分類Dev

CSSを使用したページ読み込み後のスライドの遅延

分類Dev

遅延後のページの読み込み時に純粋なCSSブックカバーをめくる

分類Dev

Jenkinsは最後のビルドからX分後にトリガーします

分類Dev

画像の遅延読み込み後にサイズ変更関数を呼び出します

分類Dev

GKEを使用したGCS、403GCSバケットへの書き込み権限が不十分

分類Dev

java.io.Fileは、ファイルへの書き込み中に余分な文字を取得します

分類Dev

C# TcpClient - StreamWriter への後続の呼び出しは、最初の 1 つだけを書き込みます

分類Dev

5分後にのみ新しいファイルを作成し、そのファイルに5分すべてのデータを書き込む方法

分類Dev

[]バイトへの追加、ファイルへの書き込み、読み取りを伴うGoの問題

分類Dev

ノードでcheerioを使用して変更した後のファイルへの書き込み

Related 関連記事

  1. 1

    Tor Browserを更新した後、15分の遅延後にのみ開きます

  2. 2

    `pv`を使用したディスクへの書き込みは、最初は速く、最後は遅いようです。

  3. 3

    JQuery : 一定の遅延後に「読み込み中」を表示

  4. 4

    最後のX分を取得する方法-Angularjs

  5. 5

    HttpClient postAbsVert.xで2分後に応答を取得する

  6. 6

    遅延読み込みの例外を伴うWildflyとMybatis

  7. 7

    ディスクへのデータの書き込みを遅らせる背後にある哲学は何ですか?

  8. 8

    DB書き込みはSparkStreamingで遅延実行されます

  9. 9

    h5pyを使用したhdf5への増分書き込み

  10. 10

    .datファイルへの書き込み、番号の増分

  11. 11

    ソケットへの書き込み間の遅延の作成

  12. 12

    JPA Hibernate、結合後の遅延読み込み

  13. 13

    Pythonでのファイルへの「遅延」書き込み

  14. 14

    char *に変換した後のstd :: atomicへの書き込み

  15. 15

    Swift-後でnilにすることができるプロパティの遅延読み込み

  16. 16

    画像の遅延読み込み後に画像が壊れた場合、画像のデフォルトを設定します。どうすればいいですか?

  17. 17

    Angular7遅延読み込みを使用した後にバインドできません

  18. 18

    ストリームAPIとPrintWriterを使用して、ファイルへの書き込み時に最後の改行を避けるためにどのように?

  19. 19

    投稿、書き込み、削除の後に取得フェッチを行う方法は?

  20. 20

    CSSを使用したページ読み込み後のスライドの遅延

  21. 21

    遅延後のページの読み込み時に純粋なCSSブックカバーをめくる

  22. 22

    Jenkinsは最後のビルドからX分後にトリガーします

  23. 23

    画像の遅延読み込み後にサイズ変更関数を呼び出します

  24. 24

    GKEを使用したGCS、403GCSバケットへの書き込み権限が不十分

  25. 25

    java.io.Fileは、ファイルへの書き込み中に余分な文字を取得します

  26. 26

    C# TcpClient - StreamWriter への後続の呼び出しは、最初の 1 つだけを書き込みます

  27. 27

    5分後にのみ新しいファイルを作成し、そのファイルに5分すべてのデータを書き込む方法

  28. 28

    []バイトへの追加、ファイルへの書き込み、読み取りを伴うGoの問題

  29. 29

    ノードでcheerioを使用して変更した後のファイルへの書き込み

ホットタグ

アーカイブ