Kafka Streams:集約用のカスタムTimestampExtractor

ハウケ

ユースケースをテストするために、非常に単純なKafkaStreamsデモアプリケーションを構築しています。

使用しているKafkaブローカー(現在バージョン0.10.0)をアップグレードできず、0.10.0より前のプロデューサーによって書き込まれたメッセージがいくつかあるため、カスタムTimestampExtractorを使用しています。メインクラスの最初の設定のデフォルト:

config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, GenericRecordTimestampExtractor.class);

私のソーストピックから消費するとき、これは完全にうまく機能します。しかし、集計演算子を使用すると、内部集計トピックから使用するときにカスタム実装の代わりにの実装が使用されるため、FailOnInvalidTimestampTimestampExtractor例外が発生します

Streamsアプリのコードは次のようになります。

...

KStream<String, MyValueClass> clickStream = streamsBuilder
              .stream("mytopic", Consumed.with(Serdes.String(), valueClassSerde));

KTable<Windowed<Long>, Long> clicksByCustomerId = clickStream
              .map(((key, value) -> new KeyValue<>(value.getId(), value)))
              .groupByKey(Serialized.with(Serdes.Long(), valueClassSerde))
              .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(1)))
              .count();
...

私が遭遇している例外は次のとおりです。

    Exception in thread "click-aggregator-b9d77f2e-0263-4fa3-bec4-e48d4d6602ab-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: 
Input record ConsumerRecord(topic = click-aggregator-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition, partition = 9, offset = 0, CreateTime = -1, serialized key size = 8, serialized value size = 652, headers = RecordHeaders(headers = [], isReadOnly = false), key = 11230, value = org.example.MyValueClass@2a3f2ea2) has invalid (negative) timestamp. 
Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.

ここで問題はTimestampExtractor、内部集約トピックから読み取るときにKafka Streamsにカスタムを使用させる方法はありますか(最適には、Streams DSLを使用している間)?

マティアスJ.サックス

タイムスタンプエクストラクタを変更することはできません(現在v1.0.0)。これは、正確性の理由から許可されていません。

しかし、そもそもタイムスタンプ-1のレコードがこのトピックにどのように書き込まれるのか本当に疑問に思っています。Kafka Streamsは、レコードの書き込み時にカスタムエクストラクターによって提供されたタイムスタンプを使用します。またKafkaProducer、負のタイムスタンプを持つレコードを書き込むことはできません。

したがって、私が考えることができる唯一の説明は、他のプロデューサーが再パーティション化トピックに書き込んだということです-そしてこれは許可されていません... KafkaStreamsだけが再パーティション化トピックに書き込む必要があります。

このトピックを削除し、Kafka Streamsに再作成させて、クリーンな状態に戻す必要があると思います。

他の答えの議論/コメントから:

Kafka Streamsを使用するには、0.10以上の形式が必要です。ブローカーをアップグレードして0.9以前の形式を維持すると、KafkaStreamsが期待どおりに機能しない可能性があります。

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

Kafka Streams TimestampExtractor

分類Dev

Kafkaストリーム集約RecordTooLargeException

分類Dev

Axon Kafkaの統合-カスタムKafka ProducerFactoryの使用

分類Dev

kafkaストリームでの集約と状態ストアの保持

分類Dev

Kafka Streams:RocksDB TTL

分類Dev

Kafka Connect and Streams

分類Dev

LocalDateTime用のカスタムSpring-kafkaデシリアライザー

分類Dev

Kafkaストリーム集約スロー例外

分類Dev

カスタムオブジェクトを使用してKafkaストリームを処理および集約する方法は?

分類Dev

Kafkaストリーム集約タスクの結果を取得し、データを別のサービスに送信する方法は?

分類Dev

KafkaのCommitSync

分類Dev

Kafka Streams local state stores

分類Dev

Kafka streams aggregate throwing Exception

分類Dev

Kafka Streams GlobalKTable sychronization to applications

分類Dev

Kafka Streams: POJO serialization/deserialization

分類Dev

Kafka Connect vs Streams for Sinks

分類Dev

Kafka Streams - missing source topic

分類Dev

Kafka Streamsタスク割り当て

分類Dev

コンフルエントなkafka-connect-s3用のカスタムAvroConverterの実装

分類Dev

Kafka Streams:ConsumerRebalanceListenerの実装

分類Dev

Kafka Streams用のSparkのようなアキュムレータはありますか?

分類Dev

kafka用に独自のカスタムパーティショナーを作成する

分類Dev

カスタム変換のためのKafkaストリームの使用

分類Dev

kafka接続とkafkaホストの要件

分類Dev

ブローカー側のKafkaトピックのカスタム圧縮?

分類Dev

Kafka用CSVコネクタ

分類Dev

Kafka用CSVコネクタ

分類Dev

2つの異なるkafkaクラスター設定用のSpring Kafka構成

分類Dev

Kafka ConnectJDBCシンク-KSQLからの集約データを格納します

Related 関連記事

  1. 1

    Kafka Streams TimestampExtractor

  2. 2

    Kafkaストリーム集約RecordTooLargeException

  3. 3

    Axon Kafkaの統合-カスタムKafka ProducerFactoryの使用

  4. 4

    kafkaストリームでの集約と状態ストアの保持

  5. 5

    Kafka Streams:RocksDB TTL

  6. 6

    Kafka Connect and Streams

  7. 7

    LocalDateTime用のカスタムSpring-kafkaデシリアライザー

  8. 8

    Kafkaストリーム集約スロー例外

  9. 9

    カスタムオブジェクトを使用してKafkaストリームを処理および集約する方法は?

  10. 10

    Kafkaストリーム集約タスクの結果を取得し、データを別のサービスに送信する方法は?

  11. 11

    KafkaのCommitSync

  12. 12

    Kafka Streams local state stores

  13. 13

    Kafka streams aggregate throwing Exception

  14. 14

    Kafka Streams GlobalKTable sychronization to applications

  15. 15

    Kafka Streams: POJO serialization/deserialization

  16. 16

    Kafka Connect vs Streams for Sinks

  17. 17

    Kafka Streams - missing source topic

  18. 18

    Kafka Streamsタスク割り当て

  19. 19

    コンフルエントなkafka-connect-s3用のカスタムAvroConverterの実装

  20. 20

    Kafka Streams:ConsumerRebalanceListenerの実装

  21. 21

    Kafka Streams用のSparkのようなアキュムレータはありますか?

  22. 22

    kafka用に独自のカスタムパーティショナーを作成する

  23. 23

    カスタム変換のためのKafkaストリームの使用

  24. 24

    kafka接続とkafkaホストの要件

  25. 25

    ブローカー側のKafkaトピックのカスタム圧縮?

  26. 26

    Kafka用CSVコネクタ

  27. 27

    Kafka用CSVコネクタ

  28. 28

    2つの異なるkafkaクラスター設定用のSpring Kafka構成

  29. 29

    Kafka ConnectJDBCシンク-KSQLからの集約データを格納します

ホットタグ

アーカイブ