私が使用していコンフルエントのカフカが接続S3 AWS S3にapacheのカフカからデータをコピーします。
問題は、Confluent Schema RegistryのAvroシリアライザーを使用していないAVRO形式のKafkaデータがあり、Kafkaプロデューサーを変更できないことです。そのため、Kafkaから既存のAvroデータを逆シリアル化し、AWSS3で寄木細工の形式で同じものを永続化する必要があります。confluentのAvroConverterをこのような値コンバーターとして使用してみました-
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost/api/v1/avro
そして、私はこのエラーが発生しています-
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic dcp-all to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:488)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
私の知る限り、「io.confluent.connect.avro.AvroConverter」は、データがConfluent Schema RegistryのAvroシリアライザーを使用してKafkaで書き込まれた場合にのみ機能するため、このエラーが発生します。だから私の質問は、この場合、汎用のAvroConverterを実装する必要がありますか?はいの場合、既存のソースコードを拡張するにはどうすればよいですか?https://github.com/confluentinc/kafka-connect-storage-cloud?
ここでの助けはありがたいです。
そのリポジトリを拡張する必要はありません。BlueApronがProtobufに対して行ったように、Converter
(Apache Kafkaの一部)シェードをJARに実装してから、Connectワーカーに配置する必要があります。CLASSPATH
または、これが機能するかどうかを確認します-https://github.com/farmdawgnation/registryless-avro-converter
Confluentスキーマレジストリを使用しない
そして、どのようなレジストリされますが使用して?私が知っているそれぞれには、Confluentのものとインターフェースするための構成があります
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加