Kafka Streams TimestampExtractor

死後

みなさん、こんにちは。TimestampExtractorとKafkaStreamsについて質問があります。

私たちのアプリケーションでは、順不同のイベントを受信する可能性があるため、トピックに配置された時点ではなく、ペイロード内の営業日に従ってイベントを注文したいと思います。

この目的のために、ペイロードからタイムスタンプを取得できるようにカスタムTimestampExtractorをプログラムしました。ここで説明するまではすべて完璧に機能しましたが、このトピックに合わせてKTableを作成すると、順序が狂って受信したイベント(ビジネスの観点からは最後のイベントではなく、最後に受信したイベント)が次のように表示されることがわかりました。ペイロードからのタイムスタンプを持つConsumerRecord中のオブジェクトの最後の状態。

Kafka StreamがTimestampExtractorのこの異常な問題を修正すると仮定したのは、私の間違いだったのかもしれません。

次に、デバッグ中に、TimestampExtractorが-1を返した結果、Kafka Streamsがメッセージを無視し、TimestampExtractorが最後に受け入れられたイベントのタイムスタンプも配信していることを確認したので、次のチェック(payloadTimestamp <previousTimestamp)を実現するロジックを構築します。 、これは私が望む論理を達成しますが、私が危険な海域を航行しているかどうかはわかりません。

このようなロジックや、Kafkaストリームの異常なイベントを処理するために他にどのような方法が存在するかを処理することは許可されていますか?

答えのためのThx ..

マティアスJ.サックス

現在(Kafka 2.0)KTableは、入力トピックに異常なデータがないことを前提としているため、更新時にタイムスタンプを考慮しません。この仮定の理由は、「シングルライターの原則」です。圧縮されたKTable入力トピックの場合、キーごとに1つのプロデューサーしかないため、次の点に関して異常なデータは発生しないと想定されます。単一のキー。

これは既知の問題です:https//issues.apache.org/jira/browse/KAFKA-6521

あなたの修正のために:この「ハック」を行うことは100%正しくも安全でもありません:

  • まず、2つの異なるキーを持つ2つの異なるメッセージがあると仮定します<key1, value1, 5>, <key2, value2, 3>タイムスタンプ3の2番目のレコードは、タイムスタンプ5の最初のレコードと比較して遅くなります。ただし、両方のキーが異なるため、実際には2番目のレコードをKTableに配置する必要があります。同じキーを持つ2つのレコードがある場合にのみ、遅延到着データIHMOを削除する必要があります。
  • 次に、同じキーを持つ2つのレコードがあり、2番目のレコードが故障している場合、2番目のレコードを処理する前にクラッシュするとTimestampExtractor、最初のレコードのタイムスタンプが失われます。したがって、再起動時に、アウトオブオーダーレコードは破棄されません。

これを正しく行うには、ステートレスでキーに依存しないのではなく、アプリケーションロジックで「手動で」フィルタリングする必要がありますTimestampExtractorを介してデータを読み取る代わりに、データをbuilder#table()ストリームとして読み取り、を適用して.groupByKey().reduce()を構築できますKTableあなたにはReducer、ロジックは、新しいと古いレコードのタイムスタンプを比較し、大きい方のタイムスタンプを持つレコードを返します。

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

Kafka Streams:集約用のカスタムTimestampExtractor

分類Dev

Kafka-TimestampExtractorに関する問題

分類Dev

Kafka Streams:RocksDB TTL

分類Dev

Kafka Connect and Streams

分類Dev

Kafka Streams local state stores

分類Dev

Kafka streams aggregate throwing Exception

分類Dev

Kafka Streams GlobalKTable sychronization to applications

分類Dev

Kafka Streams: POJO serialization/deserialization

分類Dev

Kafka Connect vs Streams for Sinks

分類Dev

Kafka Streams - missing source topic

分類Dev

Kafka Streams API:KStreamからKTable

分類Dev

Kafka Streams API:KStreamからKTable

分類Dev

Kafka Streams API:KStreamからKTable

分類Dev

Kafka Streams:ConsumerRebalanceListenerの実装

分類Dev

Why is windowing now working for Kafka Streams?

分類Dev

NPE while deserializing avro messages in kafka streams

分類Dev

Kafka Streamsアプリを停止する

分類Dev

Kafka Streamsタスク割り当て

分類Dev

Why Apache Kafka Streams uses RocksDB and if how is it possible to change it?

分類Dev

How to add a cooldown/rate-limit to a stream in Kafka Streams?

分類Dev

Kafka Streams:句読点とプロセス

分類Dev

Kafka Streams 1.1.0: Consumer Group Reprocessing Entire Log

分類Dev

Kafka Streams: How to ensure offset is committed after processing is completed

分類Dev

Kafka streams on spring, trouble with exactly once ACL: TransactionalIdAuthorizationException

分類Dev

Kafka Streams:RocksDbを動的に構成する

分類Dev

Kafka Streams transform()状態ストア

分類Dev

Message pre-processing (topic - topic) - Kafka Connect API vs. Streams vs Kafka Consumer?

分類Dev

Kafka Streams: Should we advance stream time per key to test Windowed suppression?

分類Dev

Scala-KStream(Kafka Streams)をフィルタリングする方法

Related 関連記事

  1. 1

    Kafka Streams:集約用のカスタムTimestampExtractor

  2. 2

    Kafka-TimestampExtractorに関する問題

  3. 3

    Kafka Streams:RocksDB TTL

  4. 4

    Kafka Connect and Streams

  5. 5

    Kafka Streams local state stores

  6. 6

    Kafka streams aggregate throwing Exception

  7. 7

    Kafka Streams GlobalKTable sychronization to applications

  8. 8

    Kafka Streams: POJO serialization/deserialization

  9. 9

    Kafka Connect vs Streams for Sinks

  10. 10

    Kafka Streams - missing source topic

  11. 11

    Kafka Streams API:KStreamからKTable

  12. 12

    Kafka Streams API:KStreamからKTable

  13. 13

    Kafka Streams API:KStreamからKTable

  14. 14

    Kafka Streams:ConsumerRebalanceListenerの実装

  15. 15

    Why is windowing now working for Kafka Streams?

  16. 16

    NPE while deserializing avro messages in kafka streams

  17. 17

    Kafka Streamsアプリを停止する

  18. 18

    Kafka Streamsタスク割り当て

  19. 19

    Why Apache Kafka Streams uses RocksDB and if how is it possible to change it?

  20. 20

    How to add a cooldown/rate-limit to a stream in Kafka Streams?

  21. 21

    Kafka Streams:句読点とプロセス

  22. 22

    Kafka Streams 1.1.0: Consumer Group Reprocessing Entire Log

  23. 23

    Kafka Streams: How to ensure offset is committed after processing is completed

  24. 24

    Kafka streams on spring, trouble with exactly once ACL: TransactionalIdAuthorizationException

  25. 25

    Kafka Streams:RocksDbを動的に構成する

  26. 26

    Kafka Streams transform()状態ストア

  27. 27

    Message pre-processing (topic - topic) - Kafka Connect API vs. Streams vs Kafka Consumer?

  28. 28

    Kafka Streams: Should we advance stream time per key to test Windowed suppression?

  29. 29

    Scala-KStream(Kafka Streams)をフィルタリングする方法

ホットタグ

アーカイブ