Kafka streams aggregate throwing Exception

Anthony :

this is my Kafka streams code which uses sliding window to sum all the integer data in the time window.

public class KafkaWindowingLIS {
    public static void main(String[] args) {
        Properties config = new Properties();
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafkahost:9092");
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-windowing-lis");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        Integer uid = 1;
        long tenSeconds = 1000 * 10;

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> dataStream = builder.stream("kafka-windowing-lis");
        KStream<Integer, Integer> integerKStream = dataStream
                .filter((key, val) -> {               //Filter only numbers from Stream
                    try {
                        Integer.parseInt(val);
                        return true;
                    } catch (NumberFormatException e) {
                        return false;
                    }
                })
                .map((key, val) -> new KeyValue<Integer, Integer>(uid, Integer.parseInt(val)));

        TimeWindowedKStream<Integer, Integer> timeWindowedKStream = integerKStream
                .groupBy((k, v) -> k, Serialized.with(Serdes.Integer(), Serdes.Integer()))     //Killed my time
                .windowedBy(TimeWindows.of(tenSeconds));

        timeWindowedKStream.aggregate(
                () -> 0,
                (key, value, aggregate) -> value + aggregate)
                .toStream().print(Printed.toSysOut());


        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config);
//        kafkaStreams.cleanUp();
        kafkaStreams.start();
        Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
    }
}

I am getting the following exception :

Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual value type (value type: java.lang.Integer). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
    at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:204)
    at org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:138)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl$WindowStoreReadWriteDecorator.put(ProcessorContextImpl.java:533)
    at org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:138)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:366)
    ... 5 more
Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
    at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:59)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:50)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:27)
    at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192)
    ... 14 more

I looked for the solution in various pages, looks like I am missing something in this. Any inputs would be greatly welcomed. Thank you.

Matthias J. Sax :

Because you use aggregate(), you would need to set the output value serde explicitly via aggregate(..., Materialized.with(...)). The output value type might be different compare to the input value type and thus the input value serde cannot be reused. (Because of Java type erasure, Kafka Streams does not know that the type did actually not change...) Hence, Kafka Streams falls back to the default serde from the configuration.

As an alternative, you could use reduce() instead of aggregate to fix the problem. In reduce() the output type is the same as the input type, and thus the input value serde can be used for the output value.

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

Is throwing an Exception expensive in Java?

分類Dev

WebBrowser throwing unhandled exception

分類Dev

WebBrowser throwing unhandled exception

分類Dev

GETDATE() throwing exception

分類Dev

Resteasy client throwing exception

分類Dev

Kafka Streams:RocksDB TTL

分類Dev

Kafka Connect and Streams

分類Dev

Kafka Streams TimestampExtractor

分類Dev

Mock IMemoryCache with Moq throwing exception

分類Dev

Why is this Jekyll plugin throwing an exception?

分類Dev

Compiler error when throwing exception

分類Dev

fstream EOF unexpectedly throwing exception

分類Dev

spring JTable creation throwing exception

分類Dev

.NET DateTime Parse throwing exception

分類Dev

for / fold Throwing Arity Mismatch Exception

分類Dev

Kafka Streams local state stores

分類Dev

Kafka Streams GlobalKTable sychronization to applications

分類Dev

Kafka Streams: POJO serialization/deserialization

分類Dev

Kafka Connect vs Streams for Sinks

分類Dev

Kafka Streams - missing source topic

分類Dev

Visual Studio is throwing a "wrong" compile time exception

分類Dev

Throwing exception causes SIGSEGV on OSX 10.11.4 + clang

分類Dev

Throwing an exception in a constructor of a c++ class

分類Dev

Repeating appending to a list until throwing exception

分類Dev

Observable throwing Exception When Updating MutableLiveData

分類Dev

Observable throwing Exception When Updating MutableLiveData

分類Dev

Why does this keep throwing an exception error?

分類Dev

Why is is_null throwing an exception on an unset variable

分類Dev

c#linq with group by and join throwing exception