Kafka Streams GlobalKTable sychronization to applications

px5x2

With working normal k-streams, kafka stores the offsets of each application on its internal offset topics. On a application restart, applications reprocess the topics depending on auto.offset.reset policy. This is indeed explained here.

I am using kafka-stream's GlobalKTable to replicate the data over applications. However I'm a bit confused about applications' restarts since it's not populated on applications whose id (StreamsConfig.APPLICATION_ID_CONFIG) does not change after restart (due to a deployment or a crash). Whenever I start a new instance of streams application with new id, GlobalKTable is populated.

A GlobalKTable is nothing different but a topic with log-compaction feature enabled. javadoc of creating a StreamsBuilder#globalTable states:

streamsBuilder.globalTable("some-topic", Materialized.as("kglobaltable-store"))

Note that GlobalKTable always applies "auto.offset.reset" strategy "earliest" regardless of the specified value in StreamsConfig.

Hence I expect, regardless of the application id, my streams applications read the kglobaltable-store topic from the start and populate store locally like this github issue. It seems the topic the javadoc refers is some-topic instead of kglobaltable-store.

Is this the intended behaviour for GlobalKTable? And additionally is there a retention policy on topics which are backing GlobalKTables?

This behaviour also results in stale data on kglobaltable-store topic when we have a retention policy on some-topic. An example would be as follows:

At time t0, let;

some-topic: (1, a) -> (2, b) -> (1, c)

kglobaltable-store: [(1, c), (2, b)]

After some time (2, b) is subject to retention, I start my streams application (with a new id) and my GlobalKTable only stores the record (1, c) If this is the case.

EDIT: I am using InMemoryKeyValueStore.

Matthias J. Sax

Because you are using InMemoryKeyValueStore I assume that you are hitting this bug: https://issues.apache.org/jira/browse/KAFKA-6711

As a workaround, you can delete the local checkpoint file (cf GlobalKTable checkpoints) for the global store -- this will trigger the bootstrapping on restart. Or you switch back to default RocksDB store.

Btw: For if you read a topic directly as a table or global-table, Kafka Streams will not create an additional changelog topic for fault-tolerance, but use the original input topic for this purpose (this reduces storage requirements within the Kafka cluster). Thus, those input topics should have log compaction enabled.

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

Kafka-Streams-参加する前にGlobalKTableをフィルタリングする

分類Dev

Springクラウドストリームkafka-streamsを使用してGlobalKTableを作成するために利用できる例はありますか?

分類Dev

Kafka Streams:RocksDB TTL

分類Dev

Kafka Connect and Streams

分類Dev

Kafka Streams TimestampExtractor

分類Dev

Kafka Streams local state stores

分類Dev

Kafka streams aggregate throwing Exception

分類Dev

Kafka Streams: POJO serialization/deserialization

分類Dev

Kafka Connect vs Streams for Sinks

分類Dev

Kafka Streams - missing source topic

分類Dev

特定のフィールドでのKafka Stream-GlobalKTable結合

分類Dev

Kafka Streams API:KStreamからKTable

分類Dev

Kafka Streams API:KStreamからKTable

分類Dev

Kafka Streams API:KStreamからKTable

分類Dev

Kafka Streams:ConsumerRebalanceListenerの実装

分類Dev

Why is windowing now working for Kafka Streams?

分類Dev

NPE while deserializing avro messages in kafka streams

分類Dev

How to deploy Kafka Stream applications on Kubernetes?

分類Dev

Kafka Streamsアプリを停止する

分類Dev

Kafka Streamsタスク割り当て

分類Dev

Why Apache Kafka Streams uses RocksDB and if how is it possible to change it?

分類Dev

How to add a cooldown/rate-limit to a stream in Kafka Streams?

分類Dev

Kafka Streams:句読点とプロセス

分類Dev

Kafka Streams 1.1.0: Consumer Group Reprocessing Entire Log

分類Dev

Kafka Streams: How to ensure offset is committed after processing is completed

分類Dev

Kafka streams on spring, trouble with exactly once ACL: TransactionalIdAuthorizationException

分類Dev

Kafka Streams:RocksDbを動的に構成する

分類Dev

Kafka Streams transform()状態ストア

分類Dev

Message pre-processing (topic - topic) - Kafka Connect API vs. Streams vs Kafka Consumer?

Related 関連記事

  1. 1

    Kafka-Streams-参加する前にGlobalKTableをフィルタリングする

  2. 2

    Springクラウドストリームkafka-streamsを使用してGlobalKTableを作成するために利用できる例はありますか?

  3. 3

    Kafka Streams:RocksDB TTL

  4. 4

    Kafka Connect and Streams

  5. 5

    Kafka Streams TimestampExtractor

  6. 6

    Kafka Streams local state stores

  7. 7

    Kafka streams aggregate throwing Exception

  8. 8

    Kafka Streams: POJO serialization/deserialization

  9. 9

    Kafka Connect vs Streams for Sinks

  10. 10

    Kafka Streams - missing source topic

  11. 11

    特定のフィールドでのKafka Stream-GlobalKTable結合

  12. 12

    Kafka Streams API:KStreamからKTable

  13. 13

    Kafka Streams API:KStreamからKTable

  14. 14

    Kafka Streams API:KStreamからKTable

  15. 15

    Kafka Streams:ConsumerRebalanceListenerの実装

  16. 16

    Why is windowing now working for Kafka Streams?

  17. 17

    NPE while deserializing avro messages in kafka streams

  18. 18

    How to deploy Kafka Stream applications on Kubernetes?

  19. 19

    Kafka Streamsアプリを停止する

  20. 20

    Kafka Streamsタスク割り当て

  21. 21

    Why Apache Kafka Streams uses RocksDB and if how is it possible to change it?

  22. 22

    How to add a cooldown/rate-limit to a stream in Kafka Streams?

  23. 23

    Kafka Streams:句読点とプロセス

  24. 24

    Kafka Streams 1.1.0: Consumer Group Reprocessing Entire Log

  25. 25

    Kafka Streams: How to ensure offset is committed after processing is completed

  26. 26

    Kafka streams on spring, trouble with exactly once ACL: TransactionalIdAuthorizationException

  27. 27

    Kafka Streams:RocksDbを動的に構成する

  28. 28

    Kafka Streams transform()状態ストア

  29. 29

    Message pre-processing (topic - topic) - Kafka Connect API vs. Streams vs Kafka Consumer?

ホットタグ

アーカイブ