それぞれのスキーマのデータに対してAvroでKafkaコンシューマーを実行しようとすると、「AvroRuntimeException:Malformed data。Length is negative:-40」というエラーが返されます。他にもバイト配列をjson、Avroの書き込みと読み取り、Kafka Avro Binary * coderに変換する際に同様の問題が発生したようです。私もこのコンシューマグループの例を参照しましたが、これはすべて役に立ちましたが、これまでのところこのエラーには役立ちません。コードのこの部分(73行目)まで機能します。
デコーダーデコーダー= DecoderFactory.get()。binaryDecoder(byteArrayInputStream、null);
私は他のデコーダーを試し、byteArrayInputStream変数の内容を出力しました。これは、シリアル化されたavroデータがどのように見えると私が思うと思いますか(メッセージで、スキーマと一部のデータといくつかの不正なデータを確認できます)を出力しました。 594を返す.available()メソッドを使用して利用可能なバイト。このエラーが発生している理由を理解できません。Apache Nifiを使用して、hdfsから同じスキーマでKafkaストリームを生成します。何か助けていただければ幸いです。
おそらく、問題は、NifiによるAvroデータの書き込み(エンコード)方法と、コンシューマーアプリによるデータの読み取り(デコード)方法の不一致です。
一言で言えば、AvroのAPIは、シリアル化に対して2つの異なるアプローチを提供します。
org.apache.avro.file.{DataFileWriter/DataFileReader}
)にAvroスキーマを埋め込みます。スキーマをAvroファイルに埋め込むことは、(a)通常、Avroファイルの「ペイロード」が埋め込まれたAvroスキーマよりも桁違いに大きく、(b)次に、これらのファイルを思いのままにコピーまたは移動できるため、非常に意味があります。それでも、誰かに相談することなく、もう一度読むことができることを確認してください。org.apache.avro.io.{BinaryEncoder/BinaryDecoder}
して、パッケージ名の違いに注意してください:io
こことfile
上記)。たとえば、上記のバリアント1と比較して、Avroスキーマをすべての単一のメッセージに再埋め込みするオーバーヘッドが発生しないため、このアプローチは、たとえばKafkaトピックに書き込まれるAvroエンコードメッセージでよく使用されます。 (非常に合理的な)ポリシーは、同じKafkaトピックに対して、メッセージは同じAvroスキーマでフォーマット/エンコードされることです。ストリームデータのコンテキストでは、移動中のデータレコードは通常、上記の静止データのAvroファイル(通常は数百または数百または数百KB)よりもはるかに小さい(通常100バイトから数百KB)ため、これは重要な利点です。数千MB); したがって、Avroスキーマのサイズは比較的大きいため、Kafkaに2000データレコードを書き込むときに2000xスキーマを埋め込む必要はありません。欠点は、「どういうわけか」AvroスキーマがKafkaトピックにどのようにマップされるかを追跡します。より正確には、スキーマを直接埋め込む方法を使わずに、メッセージがどのAvroスキーマでエンコードされたかを追跡する必要があります。良いニュースはこれを透過的に行うためのKafkaエコシステム(Avroスキーマレジストリ)で利用可能なツール。したがって、バリアント1と比較して、バリアント2は利便性を犠牲にして効率を向上させます。その結果、エンコードされたAvroデータの「ワイヤー形式」は、上記の(1)と(2)のどちらを使用するかによって異なります。
私はApache Nifiにはあまり詳しくありませんが、ソースコード(例:ConvertAvroToJSON.java)をざっと見てみると、バリアント1を使用していることがわかります。つまり、AvroスキーマとAvroレコードが埋め込まれています。ただし、コンシューマコードはDecoderFactory.get().binaryDecoder()
バリアント2を使用しているため、スキーマは埋め込まれていません。
多分これはあなたが遭遇しているエラーを説明していますか?
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加