ユースケースをテストするために、非常に単純なKafkaStreamsデモアプリケーションを構築しています。
使用しているKafkaブローカー(現在バージョン0.10.0)をアップグレードできず、0.10.0より前のプロデューサーによって書き込まれたメッセージがいくつかあるため、カスタムTimestampExtractorを使用しています。メインクラスの最初の設定のデフォルト:
config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, GenericRecordTimestampExtractor.class);
私のソーストピックから消費するとき、これは完全にうまく機能します。しかし、集計演算子を使用すると、内部集計トピックから使用するときにカスタム実装の代わりにの実装が使用されるため、FailOnInvalidTimestamp
TimestampExtractor
例外が発生します。
Streamsアプリのコードは次のようになります。
...
KStream<String, MyValueClass> clickStream = streamsBuilder
.stream("mytopic", Consumed.with(Serdes.String(), valueClassSerde));
KTable<Windowed<Long>, Long> clicksByCustomerId = clickStream
.map(((key, value) -> new KeyValue<>(value.getId(), value)))
.groupByKey(Serialized.with(Serdes.Long(), valueClassSerde))
.windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(1)))
.count();
...
私が遭遇している例外は次のとおりです。
Exception in thread "click-aggregator-b9d77f2e-0263-4fa3-bec4-e48d4d6602ab-StreamThread-1" org.apache.kafka.streams.errors.StreamsException:
Input record ConsumerRecord(topic = click-aggregator-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition, partition = 9, offset = 0, CreateTime = -1, serialized key size = 8, serialized value size = 652, headers = RecordHeaders(headers = [], isReadOnly = false), key = 11230, value = org.example.MyValueClass@2a3f2ea2) has invalid (negative) timestamp.
Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.
ここで問題はTimestampExtractor
、内部集約トピックから読み取るときにKafka Streamsにカスタムを使用させる方法はありますか(最適には、Streams DSLを使用している間)?
タイムスタンプエクストラクタを変更することはできません(現在v1.0.0
)。これは、正確性の理由から許可されていません。
しかし、そもそもタイムスタンプ-1のレコードがこのトピックにどのように書き込まれるのか本当に疑問に思っています。Kafka Streamsは、レコードの書き込み時にカスタムエクストラクターによって提供されたタイムスタンプを使用します。またKafkaProducer
、負のタイムスタンプを持つレコードを書き込むことはできません。
したがって、私が考えることができる唯一の説明は、他のプロデューサーが再パーティション化トピックに書き込んだということです-そしてこれは許可されていません... KafkaStreamsだけが再パーティション化トピックに書き込む必要があります。
このトピックを削除し、Kafka Streamsに再作成させて、クリーンな状態に戻す必要があると思います。
他の答えの議論/コメントから:
Kafka Streamsを使用するには、0.10以上の形式が必要です。ブローカーをアップグレードして0.9以前の形式を維持すると、KafkaStreamsが期待どおりに機能しない可能性があります。
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加