this is my Kafka streams code which uses sliding window to sum all the integer data in the time window.
public class KafkaWindowingLIS {
public static void main(String[] args) {
Properties config = new Properties();
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafkahost:9092");
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-windowing-lis");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Integer uid = 1;
long tenSeconds = 1000 * 10;
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> dataStream = builder.stream("kafka-windowing-lis");
KStream<Integer, Integer> integerKStream = dataStream
.filter((key, val) -> { //Filter only numbers from Stream
try {
Integer.parseInt(val);
return true;
} catch (NumberFormatException e) {
return false;
}
})
.map((key, val) -> new KeyValue<Integer, Integer>(uid, Integer.parseInt(val)));
TimeWindowedKStream<Integer, Integer> timeWindowedKStream = integerKStream
.groupBy((k, v) -> k, Serialized.with(Serdes.Integer(), Serdes.Integer())) //Killed my time
.windowedBy(TimeWindows.of(tenSeconds));
timeWindowedKStream.aggregate(
() -> 0,
(key, value, aggregate) -> value + aggregate)
.toStream().print(Printed.toSysOut());
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config);
// kafkaStreams.cleanUp();
kafkaStreams.start();
Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
}
}
I am getting the following exception :
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual value type (value type: java.lang.Integer). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:204)
at org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:138)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl$WindowStoreReadWriteDecorator.put(ProcessorContextImpl.java:533)
at org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:138)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:366)
... 5 more
Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:59)
at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:50)
at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:27)
at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192)
... 14 more
I looked for the solution in various pages, looks like I am missing something in this. Any inputs would be greatly welcomed. Thank you.
Because you use aggregate()
, you would need to set the output value serde explicitly via aggregate(..., Materialized.with(...))
. The output value type might be different compare to the input value type and thus the input value serde cannot be reused. (Because of Java type erasure, Kafka Streams does not know that the type did actually not change...) Hence, Kafka Streams falls back to the default serde from the configuration.
As an alternative, you could use reduce()
instead of aggregate
to fix the problem. In reduce()
the output type is the same as the input type, and thus the input value serde can be used for the output value.
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加