With working normal k-streams, kafka stores the offsets of each application on its internal offset topics. On a application restart, applications reprocess the topics depending on auto.offset.reset
policy. This is indeed explained here.
I am using kafka-stream's GlobalKTable
to replicate the data over applications. However I'm a bit confused about applications' restarts since it's not populated on applications whose id (StreamsConfig.APPLICATION_ID_CONFIG
) does not change after restart (due to a deployment or a crash). Whenever I start a new instance of streams application with new id, GlobalKTable
is populated.
A GlobalKTable
is nothing different but a topic with log-compaction feature enabled. javadoc of creating a StreamsBuilder#globalTable
states:
streamsBuilder.globalTable("some-topic", Materialized.as("kglobaltable-store"))
Note that
GlobalKTable
always applies "auto.offset.reset" strategy "earliest" regardless of the specified value inStreamsConfig
.
Hence I expect, regardless of the application id, my streams applications read the kglobaltable-store
topic from the start and populate store locally like this github issue. It seems the topic the javadoc refers is some-topic
instead of kglobaltable-store
.
Is this the intended behaviour for GlobalKTable
? And additionally is there a retention policy on topics which are backing GlobalKTables
?
This behaviour also results in stale data on kglobaltable-store
topic when we have a retention policy on some-topic
. An example would be as follows:
At time t0, let;
some-topic: (1, a) -> (2, b) -> (1, c)
kglobaltable-store: [(1, c), (2, b)]
After some time (2, b) is subject to retention, I start my streams application (with a new id) and my GlobalKTable
only stores the record (1, c) If this is the case.
EDIT: I am using InMemoryKeyValueStore
.
Because you are using InMemoryKeyValueStore
I assume that you are hitting this bug: https://issues.apache.org/jira/browse/KAFKA-6711
As a workaround, you can delete the local checkpoint file (cf GlobalKTable checkpoints) for the global store -- this will trigger the bootstrapping on restart. Or you switch back to default RocksDB
store.
Btw: For if you read a topic directly as a table or global-table, Kafka Streams will not create an additional changelog topic for fault-tolerance, but use the original input topic for this purpose (this reduces storage requirements within the Kafka cluster). Thus, those input topics should have log compaction enabled.
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加