Kafka Avro Consumer with Decoderの問題

SparkleGoat:

それぞれのスキーマのデータに対してAvroでKafkaコンシューマーを実行しようとすると、「AvroRuntimeException:Malformed data。Length is negative:-40」というエラーが返されます。他にもバイト配列をjsonAvroの書き込みと読み取りKafka Avro Binary * coderに変換する際に同様の問題が発生たようです。私もこのコンシューマグループの例を参照しましたが、これはすべて役に立ちましたが、これまでのところこのエラーには役立ちません。コードのこの部分(73行目)まで機能します。

デコーダーデコーダー= DecoderFactory.get()。binaryDecoder(byteArrayInputStream、null);

私は他のデコーダーを試し、byteArrayInputStream変数の内容を出力しました。これは、シリアル化されたavroデータがどのように見えると私が思うと思いますか(メッセージで、スキーマと一部のデータといくつかの不正なデータを確認できます)を出力しました。 594を返す.available()メソッドを使用して利用可能なバイト。このエラーが発生している理由を理解できません。Apache Nifiを使用して、hdfsから同じスキーマでKafkaストリームを生成します。何か助けていただければ幸いです。

マイケルG.ノール:

おそらく、問題は、NifiによるAvroデータの書き込み(エンコード)方法と、コンシューマーアプリによるデータの読み取り(デコード)方法の不一致です。

一言で言えば、AvroのAPIは、シリアル化に対して2つの異なるアプローチを提供します。

  1. 適切なAvro ファイルを作成するには:データレコードをエンコードするだけでなく、一種のプリアンブル(を介してorg.apache.avro.file.{DataFileWriter/DataFileReader})にAvroスキーマを埋め込みますスキーマをAvroファイルに埋め込むことは、(a)通常、Avroファイルの「ペイロード」が埋め込まれたAvroスキーマよりも桁違いに大きく、(b)次に、これらのファイルを思いのままにコピーまたは移動できるため、非常に意味があります。それでも、誰かに相談することなく、もう一度読むことができることを確認してください。
  2. データレコードのみをエンコードするには、つまりスキーマを埋め込まないようにします(を使用org.apache.avro.io.{BinaryEncoder/BinaryDecoder}して、パッケージ名の違いに注意してください:ioこことfile上記)。たとえば、上記のバリアント1と比較して、Avroスキーマをすべての単一のメッセージに再埋め込みするオーバーヘッドが発生しないため、このアプローチは、たとえばKafkaトピックに書き込まれるAvroエンコードメッセージでよく使用されます。 (非常に合理的な)ポリシーは、同じKafkaトピックに対して、メッセージは同じAvroスキーマでフォーマット/エンコードされることです。ストリームデータのコンテキストでは、移動中のデータレコードは通常、上記の静止データのAvroファイル(通常は数百または数百または数百KB)よりもはるかに小さい(通常100バイトから数百KB)ため、これは重要な利点です。数千MB); したがって、Avroスキーマのサイズは比較的大きいため、Kafkaに2000データレコードを書き込むときに2000xスキーマを埋め込む必要はありません。欠点は、「どういうわけか」AvroスキーマがKafkaトピックにどのようにマップされるかを追跡します。より正確には、スキーマを直接埋め込む方法を使わずに、メッセージがどのAvroスキーマでエンコードされたかを追跡する必要があります。良いニュースはこれを透過的に行うためのKafkaエコシステム(Avroスキーマレジストリ)利用可能なツールしたがって、バリアント1と比較して、バリアント2は利便性を犠牲にして効率を向上させます。

その結果、エンコードされたAvroデータの「ワイヤー形式」は、上記の(1)と(2)のどちらを使用するかによって異なります。

私はApache Nifiにはあまり詳しくありませんが、ソースコード(例:ConvertAvroToJSON.java)をざっと見てみると、バリアント1を使用していることがわかります。つまり、AvroスキーマとAvroレコードが埋め込まれています。ただし、コンシューマコードはDecoderFactory.get().binaryDecoder()バリアント2を使用しているため、スキーマは埋め込まれていません。

多分これはあなたが遭遇しているエラーを説明していますか?

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

Kafka + Kubernetes + Helm + `/ usr / bin / kafka-avro-console-consumer`?

分類Dev

Unknown magic byte with kafka-avro-console-consumer

分類Dev

How to utilize existing avro schema for my kafka consumer?

分類Dev

Confluentのkafka-avro-console-consumerはログファイルをどこに書き込みますか?

分類Dev

kafka-consumer-groupsコマンドの問題

分類Dev

Kafka Avro Schema evolution

分類Dev

kafka-avro-console-consumer:スキーマレジストリのトラストストアの場所を指定します

分類Dev

Kafka Consumer with JAVA

分類Dev

Spring boot Kafka consumer

分類Dev

kafka-avro-console-consumerを使用してavroメッセージを読み取ることができません。SerializationException:不明なマジックバイト

分類Dev

Kafka consumer manual commit offset

分類Dev

Kafka Stream: Consumer commit frequency

分類Dev

Partition specific flink kafka consumer

分類Dev

Kafka elixir consumer keeps crashing

分類Dev

kafka consumer code is not running completely

分類Dev

Kafka consumer hangs on poll when kafka is down

分類Dev

Kafka AVRO - conversion from long to datetime

分類Dev

Should Avro be used to on both the key and value in Kafka?

分類Dev

NPE while deserializing avro messages in kafka streams

分類Dev

Cloudflow is unable to read avro message from kafka

分類Dev

Kafka consumer - what's the relation of consumer processes and threads with topic partitions

分類Dev

Kafka consumer group script to see all consumer group not working

分類Dev

kafkaブートストラップ-サーバーとkafka-console-consumerのzookeeper

分類Dev

Kafka__consumer_offsetsトピックからの読み方

分類Dev

Kafka Consumer Group:新しいメッセージのみを読む

分類Dev

Better way of error handling in Kafka Consumer

分類Dev

Kafka Consumer get assigned partitions for a specific topic

分類Dev

How do I implement Kafka Consumer in Scala

分類Dev

Kafka consumer does not start from latest message

Related 関連記事

  1. 1

    Kafka + Kubernetes + Helm + `/ usr / bin / kafka-avro-console-consumer`?

  2. 2

    Unknown magic byte with kafka-avro-console-consumer

  3. 3

    How to utilize existing avro schema for my kafka consumer?

  4. 4

    Confluentのkafka-avro-console-consumerはログファイルをどこに書き込みますか?

  5. 5

    kafka-consumer-groupsコマンドの問題

  6. 6

    Kafka Avro Schema evolution

  7. 7

    kafka-avro-console-consumer:スキーマレジストリのトラストストアの場所を指定します

  8. 8

    Kafka Consumer with JAVA

  9. 9

    Spring boot Kafka consumer

  10. 10

    kafka-avro-console-consumerを使用してavroメッセージを読み取ることができません。SerializationException:不明なマジックバイト

  11. 11

    Kafka consumer manual commit offset

  12. 12

    Kafka Stream: Consumer commit frequency

  13. 13

    Partition specific flink kafka consumer

  14. 14

    Kafka elixir consumer keeps crashing

  15. 15

    kafka consumer code is not running completely

  16. 16

    Kafka consumer hangs on poll when kafka is down

  17. 17

    Kafka AVRO - conversion from long to datetime

  18. 18

    Should Avro be used to on both the key and value in Kafka?

  19. 19

    NPE while deserializing avro messages in kafka streams

  20. 20

    Cloudflow is unable to read avro message from kafka

  21. 21

    Kafka consumer - what's the relation of consumer processes and threads with topic partitions

  22. 22

    Kafka consumer group script to see all consumer group not working

  23. 23

    kafkaブートストラップ-サーバーとkafka-console-consumerのzookeeper

  24. 24

    Kafka__consumer_offsetsトピックからの読み方

  25. 25

    Kafka Consumer Group:新しいメッセージのみを読む

  26. 26

    Better way of error handling in Kafka Consumer

  27. 27

    Kafka Consumer get assigned partitions for a specific topic

  28. 28

    How do I implement Kafka Consumer in Scala

  29. 29

    Kafka consumer does not start from latest message

ホットタグ

アーカイブ