SparkStreamingアプリケーションがあります。アーキテクチャは次のとおりです
キネシスからスパーク、カフカへ。
Sparkアプリケーションは、Kinesisからの構造化ストリーミングにqubole / kinesis-sqlを使用しています。次に、データが集約され、Kafkaにプッシュされます。
私たちのユースケースでは、Kafkaにプッシュする前に4分の遅延が必要です。
ウィンドウ処理は2分と4分の透かしで行われます
val windowedCountsDF = messageDS
.withWatermark("timestamp", "4 minutes")
.groupBy(window($"timestamp", "2 minutes", "2 minutes"), $"id", $"eventType", $"topic")
Kafkaへの書き込みは2分ごとにトリガーされます
val eventFilteredQuery = windowedCountsDF
.selectExpr("topic", "id as key", "to_json(struct(*)) AS value")
.writeStream
.trigger(Trigger.ProcessingTime("2 minutes"))
.format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
.option("checkpointLocation", checkPoint)
.outputMode("update")
.option("kafka.bootstrap.servers", kafkaBootstrapServers)
.queryName("events_kafka_stream")
.start()
ウィンドウに合わせてトリガー時間を変更できますが、それでも一部のイベントは即座にkafkaにプッシュされます。
ウィンドウが完了してからx分後にKafkaへの書き込みを遅らせる方法はありますか?
ありがとう
出力モードをupdate
からappend
(デフォルトオプション)に変更します。このoutput
モードでは、更新されたすべての行がシンクに書き込まれるため、透かしを使用するかどうかは関係ありません。
ただし、このappend
モードでは、透かしが交差するまで書き込みを待機する必要があります。これはまさに必要なことです。
追加モードでは、透かしを使用して古い集計状態を削除します。ただし、ウィンドウ化された集計の出力は
withWatermark()
、モードのセマンティクスで指定された遅延しきい値で遅延します。行は、ファイナライズされた後(つまり、透かしを超えた後)に1回だけ結果テーブルに追加できます。
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加