Kafka Streams APIのArrayList Serdeに関する問題

エヴァルダスブイナウスカス:

私の以前の質問に基づいて、私はまだ私のコードの問題が何であるかを理解しようとしています。

私は可能な限り最も基本的なトピックを持っています:キーと値は一種でLongあり、これは私のプロデューサーコードです:

public class DemoProducer {
  public static void main(String... args) {
    Producer<Long, Long> producer = new KafkaProducer<>(createProperties());

    LongStream.range(1, 100)
        .forEach(
            i ->
                LongStream.range(100, 115)
                    .forEach(j -> producer.send(new ProducerRecord<>("test", i, j))));

    producer.close();
  }

  private static final Properties createProperties() {
    final Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    props.put(ProducerConfig.RETRIES_CONFIG, 0);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());

    return props;
  }
}

キーを使用してグループ化し、Kafka Streams APIを使用してArrayListに値を配置したいと思います。

これは変換を行い、新しいトピックに物事を置くことになっている私のストリームアプリですtest-aggregated

public class DemoStreams {
  public static void main(String... args) {
    final Serde<Long> longSerde = Serdes.Long();

    KStreamBuilder builder = new KStreamBuilder();

    builder
        .stream(longSerde, longSerde, "test")
        .groupByKey(longSerde, longSerde)
        .aggregate(
            ArrayList::new,
            (subscriberId, reportId, queue) -> {
              queue.add(reportId);
              return queue;
            },
            new ArrayListSerde<>(longSerde))
        .to(longSerde, new ArrayListSerde<>(longSerde), "test-aggregated");

    final KafkaStreams streams = new KafkaStreams(builder, createProperties());

    streams.cleanUp();
    streams.start();

    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
  }

  private static Properties createProperties() {
    final Properties properties = new Properties();
    String longSerdes = Serdes.Long().getClass().getName();
    properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "aggregation-app");
    properties.put(StreamsConfig.CLIENT_ID_CONFIG, "aggregation-app-client");
    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, longSerdes);
    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, ArrayListSerde.class);
    properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
    properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

    return properties;
  }
}

Serdeを次のように実装しました。

ArrayListSerde

public class ArrayListSerde<T> implements Serde<ArrayList<T>> {

  private final Serde<ArrayList<T>> inner;

  public ArrayListSerde(Serde<T> serde) {
    inner =
        Serdes.serdeFrom(
            new ArrayListSerializer<>(serde.serializer()),
            new ArrayListDeserializer<>(serde.deserializer()));
  }

  @Override
  public Serializer<ArrayList<T>> serializer() {
    return inner.serializer();
  }

  @Override
  public Deserializer<ArrayList<T>> deserializer() {
    return inner.deserializer();
  }

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
    inner.serializer().configure(configs, isKey);
    inner.deserializer().configure(configs, isKey);
  }

  @Override
  public void close() {
    inner.serializer().close();
    inner.deserializer().close();
  }
}

ArrayListSerializer

public class ArrayListSerializer<T> implements Serializer<ArrayList<T>> {

  private Serializer<T> inner;

  public ArrayListSerializer(Serializer<T> inner) {
    this.inner = inner;
  }

  // Default constructor needed by Kafka
  public ArrayListSerializer() {}

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
    // do nothing
  }

  @Override
  public byte[] serialize(String topic, ArrayList<T> queue) {
    final int size = queue.size();
    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
    final DataOutputStream dos = new DataOutputStream(baos);
    final Iterator<T> iterator = queue.iterator();
    try {
      dos.writeInt(size);
      while (iterator.hasNext()) {
        final byte[] bytes = inner.serialize(topic, iterator.next());
        dos.writeInt(bytes.length);
        dos.write(bytes);
      }
    } catch (IOException e) {
      throw new RuntimeException("Unable to serialize ArrayList", e);
    }
    return baos.toByteArray();
  }

  @Override
  public void close() {
    inner.close();
  }
}

ArrayListDeserializer

public class ArrayListDeserializer<T> implements Deserializer<ArrayList<T>> {
  private final Deserializer<T> valueDeserializer;

  public ArrayListDeserializer(final Deserializer<T> valueDeserializer) {
    this.valueDeserializer = valueDeserializer;
  }

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
    // do nothing
  }

  @Override
  public ArrayList<T> deserialize(String topic, byte[] bytes) {
    if (bytes == null || bytes.length == 0) {
      return null;
    }

    final ArrayList<T> arrayList = new ArrayList<>();
    final DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bytes));

    try {
      final int records = dataInputStream.readInt();
      for (int i = 0; i < records; i++) {
        final byte[] valueBytes = new byte[dataInputStream.readInt()];
        dataInputStream.read(valueBytes);
        arrayList.add(valueDeserializer.deserialize(topic, valueBytes));
      }
    } catch (IOException e) {
      throw new RuntimeException("Unable to deserialize ArrayList", e);
    }

    return arrayList;
  }

  @Override
  public void close() {
    // do nothing
  }
}

ただし、この例外が発生します。

Exception in thread "permission-agg4-client-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: stream-thread [aggregation-app-client-StreamThread-1] Failed to rebalance.
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:543)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
Caused by: org.apache.kafka.streams.errors.StreamsException: Failed to configure value serde class utils.ArrayListSerde
    at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:770)
    at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.<init>(AbstractProcessorContext.java:59)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.<init>(ProcessorContextImpl.java:40)
    at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:138)
    at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1078)
    at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:255)
    at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:245)
    at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1147)
    at org.apache.kafka.streams.processor.internals.StreamThread.access$800(StreamThread.java:68)
    at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:184)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536)
    ... 3 more
Caused by: org.apache.kafka.common.KafkaException: Could not instantiate class utils.ArrayListSerde Does it have a public no-argument constructor?
    at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:286)
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:246)
    at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:764)
    ... 19 more
Caused by: java.lang.InstantiationException: utils.ArrayListSerde
    at java.lang.Class.newInstance(Class.java:427)
    at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:282)
    ... 21 more
Caused by: java.lang.NoSuchMethodException: utils.ArrayListSerde.<init>()
    at java.lang.Class.getConstructor0(Class.java:3082)
    at java.lang.Class.newInstance(Class.java:412)
    ... 22 more

PriorityQueueConfluentのGitHubページにある例に基づいてSerdeを実装しようとしました:https : //github.com/confluentinc/kafka-streams-examples/tree/3.3.0-post/src/main/java/io/confluent/examples / streams / utils

Matthias J. Sax:

エラーが示すように、すべてSerdeのには引数のないコンストラクタが必要です。

原因:org.apache.kafka.common.KafkaException:クラスutils.ArrayListSerdeをインスタンス化できませんでした。引数のないパブリックコンストラクターがありますか?

あなたのクラスにArrayListSerdeはコンストラクタしかありません:

public ArrayListSerde(Serde<T> serde) { ... }

したがって、このエラーが発生します。

比較ArrayListSerializer

// Default constructor needed by Kafka
public ArrayListSerializer() {}

更新:

の標準実装ListSerdeはWIPであり、2.7リリースに含める必要があるため、カスタムリストSerdeは廃止されます。https://issues.apache.org/jira/browse/KAFKA-8326

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

Kafka Streams API:KStreamからKTable

分類Dev

Kafka Streams API:KStreamからKTable

分類Dev

Kafka Streams API:KStreamからKTable

分類Dev

Kafka Streams:ConsumerRebalanceListenerの実装

分類Dev

Kafka Streamer: ユーザー定義の「Serdes」に関する問題

分類Dev

Kafka Streams:Serdeキャストエラーを修正する方法

分類Dev

Kafka Streams:Jsonログのキーによるグループ化

分類Dev

Kafka Streamsで別々のトピックに参加しますか?

分類Dev

Kafka Streams:RocksDbを動的に構成する

分類Dev

kafka-streamsの最新のオフセットから常に消費する方法

分類Dev

Kafka Streams:RocksDB TTL

分類Dev

Kafka Connect and Streams

分類Dev

Kafka Streams TimestampExtractor

分類Dev

Kafka Streamsアプリを停止する

分類Dev

KGroupedTableの最新メッセージのみをコミットするKafka Streams

分類Dev

生産におけるkafka-streamsトポロジーの進化

分類Dev

Kafka Streamsウィンドウはどのように機能しますか?

分類Dev

Kafka Streamsのステートストアを削除/クリアする方法は?

分類Dev

Message pre-processing (topic - topic) - Kafka Connect API vs. Streams vs Kafka Consumer?

分類Dev

Kafka Streams APIに非同期/ノンブロッキングサポートがないのはなぜですか?

分類Dev

Kafka-TimestampExtractorに関する問題

分類Dev

Kafka Streamsがプロセッサの状態ストアを閉じる

分類Dev

Kafka Streams.allMetadata()メソッドは空のリストを返します

分類Dev

Kafka Streamsトポロジの処理順序は指定されていますか?

分類Dev

Kafka Streams API:セッションウィンドウの例外

分類Dev

Kafka Streams:at_least_onceを使用する場合、州のストアへの保存の順序に関する保証はありますか?

分類Dev

Apache Kafka StreamsがRocksDBを使用する理由と、それをどのように変更できるか?

分類Dev

Kafka Streams local state stores

分類Dev

Kafka streams aggregate throwing Exception

Related 関連記事

  1. 1

    Kafka Streams API:KStreamからKTable

  2. 2

    Kafka Streams API:KStreamからKTable

  3. 3

    Kafka Streams API:KStreamからKTable

  4. 4

    Kafka Streams:ConsumerRebalanceListenerの実装

  5. 5

    Kafka Streamer: ユーザー定義の「Serdes」に関する問題

  6. 6

    Kafka Streams:Serdeキャストエラーを修正する方法

  7. 7

    Kafka Streams:Jsonログのキーによるグループ化

  8. 8

    Kafka Streamsで別々のトピックに参加しますか?

  9. 9

    Kafka Streams:RocksDbを動的に構成する

  10. 10

    kafka-streamsの最新のオフセットから常に消費する方法

  11. 11

    Kafka Streams:RocksDB TTL

  12. 12

    Kafka Connect and Streams

  13. 13

    Kafka Streams TimestampExtractor

  14. 14

    Kafka Streamsアプリを停止する

  15. 15

    KGroupedTableの最新メッセージのみをコミットするKafka Streams

  16. 16

    生産におけるkafka-streamsトポロジーの進化

  17. 17

    Kafka Streamsウィンドウはどのように機能しますか?

  18. 18

    Kafka Streamsのステートストアを削除/クリアする方法は?

  19. 19

    Message pre-processing (topic - topic) - Kafka Connect API vs. Streams vs Kafka Consumer?

  20. 20

    Kafka Streams APIに非同期/ノンブロッキングサポートがないのはなぜですか?

  21. 21

    Kafka-TimestampExtractorに関する問題

  22. 22

    Kafka Streamsがプロセッサの状態ストアを閉じる

  23. 23

    Kafka Streams.allMetadata()メソッドは空のリストを返します

  24. 24

    Kafka Streamsトポロジの処理順序は指定されていますか?

  25. 25

    Kafka Streams API:セッションウィンドウの例外

  26. 26

    Kafka Streams:at_least_onceを使用する場合、州のストアへの保存の順序に関する保証はありますか?

  27. 27

    Apache Kafka StreamsがRocksDBを使用する理由と、それをどのように変更できるか?

  28. 28

    Kafka Streams local state stores

  29. 29

    Kafka streams aggregate throwing Exception

ホットタグ

アーカイブ