みなさん、こんにちは。TimestampExtractorとKafkaStreamsについて質問があります。
私たちのアプリケーションでは、順不同のイベントを受信する可能性があるため、トピックに配置された時点ではなく、ペイロード内の営業日に従ってイベントを注文したいと思います。
この目的のために、ペイロードからタイムスタンプを取得できるようにカスタムTimestampExtractorをプログラムしました。ここで説明するまではすべて完璧に機能しましたが、このトピックに合わせてKTableを作成すると、順序が狂って受信したイベント(ビジネスの観点からは最後のイベントではなく、最後に受信したイベント)が次のように表示されることがわかりました。ペイロードからのタイムスタンプを持つConsumerRecord中のオブジェクトの最後の状態。
Kafka StreamがTimestampExtractorのこの異常な問題を修正すると仮定したのは、私の間違いだったのかもしれません。
次に、デバッグ中に、TimestampExtractorが-1を返した結果、Kafka Streamsがメッセージを無視し、TimestampExtractorが最後に受け入れられたイベントのタイムスタンプも配信していることを確認したので、次のチェック(payloadTimestamp <previousTimestamp)を実現するロジックを構築します。 、これは私が望む論理を達成しますが、私が危険な海域を航行しているかどうかはわかりません。
このようなロジックや、Kafkaストリームの異常なイベントを処理するために他にどのような方法が存在するかを処理することは許可されていますか?
答えのためのThx ..
現在(Kafka 2.0)KTable
は、入力トピックに異常なデータがないことを前提としているため、更新時にタイムスタンプを考慮しません。この仮定の理由は、「シングルライターの原則」です。圧縮されたKTable入力トピックの場合、キーごとに1つのプロデューサーしかないため、次の点に関して異常なデータは発生しないと想定されます。単一のキー。
これは既知の問題です:https://issues.apache.org/jira/browse/KAFKA-6521
あなたの修正のために:この「ハック」を行うことは100%正しくも安全でもありません:
<key1, value1, 5>, <key2, value2, 3>
。タイムスタンプ3の2番目のレコードは、タイムスタンプ5の最初のレコードと比較して遅くなります。ただし、両方のキーが異なるため、実際には2番目のレコードをKTableに配置する必要があります。同じキーを持つ2つのレコードがある場合にのみ、遅延到着データIHMOを削除する必要があります。TimestampExtractor
、最初のレコードのタイムスタンプが失われます。したがって、再起動時に、アウトオブオーダーレコードは破棄されません。これを正しく行うには、ステートレスでキーに依存しないのではなく、アプリケーションロジックで「手動で」フィルタリングする必要がありますTimestampExtractor
。を介してデータを読み取る代わりに、データをbuilder#table()
ストリームとして読み取り、を適用して.groupByKey().reduce()
を構築できますKTable
。あなたにはReducer
、ロジックは、新しいと古いレコードのタイムスタンプを比較し、大きい方のタイムスタンプを持つレコードを返します。
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加