コンフルエントなkafka-connect-s3用のカスタムAvroConverterの実装

牧草地

私が使用していコンフルエントのカフカが接続S3 AWS S3にapacheのカフカからデータをコピーします。

問題は、Confluent Schema RegistryのAvroシリアライザーを使用していないAVRO形式のKafkaデータがあり、Kafkaプロデューサーを変更できないことです。そのため、Kafkaから既存のAvroデータを逆シリアル化し、AWSS3で寄木細工の形式で同じものを永続化する必要があります。confluentのAvroConverterをこのような値コンバーターとして使用してみました-

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost/api/v1/avro

そして、私はこのエラーが発生しています-

Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic dcp-all to Avro: 
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110)
    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:488)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

私の知る限り、「io.confluent.connect.avro.AvroConverter」は、データがConfluent Schema RegistryのAvroシリアライザーを使用してKafkaで書き込まれた場合にのみ機能するため、このエラーが発生します。だから私の質問は、この場合、汎用のAvroConverterを実装する必要がありますか?はいの場合、既存のソースコードを拡張するにはどうすればよいですか?https://github.com/confluentinc/kafka-connect-storage-cloud

ここでの助けはありがたいです。

OneCricketeer

そのリポジトリを拡張する必要はありません。BlueApronがProtobufに対して行ったようにConverter(Apache Kafkaの一部)シェードをJAR実装してから、Connectワーカーに配置する必要があります。CLASSPATH

または、これが機能するかどうかを確認します-https://github.com/farmdawgnation/registryless-avro-converter


Confluentスキーマレジストリを使用しない

そして、どのようなレジストリされますが使用して?私が知っているそれぞれには、Confluentのものとインターフェースするための構成があります

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

Kafka Connect S3動的S3フォルダー構造の作成?

分類Dev

Kafka Connect実装エラー

分類Dev

アーティファクトio.confluent:kafka-connect-storage-common-parent:pom:6.0.0-SNAPSHOTをコンフルエント($ {confluent.maven.repo})との間で転送できませんでした

分類Dev

カスタムgetline()の実装-whileループ内用

分類Dev

自分のIDをプッシュします。コンフルエントなkafkaはelasticsearchdockerを接続します

分類Dev

コンフルエントなKafkaでスキーマ名戦略を変更する

分類Dev

s3バケットのカスタムドメイン?

分類Dev

カスタムタイムアウト実装のコルーチン

分類Dev

Kafka Connectログのマスキング?

分類Dev

Connect-AzAccount:「Connect-AzAccount」という用語は、コマンドレット、関数、スクリプトファイル、または操作可能なプログラムの名前として認識されません。

分類Dev

Connect-AzAccount:「Connect-AzAccount」という用語は、コマンドレット、関数、スクリプトファイル、または操作可能なプログラムの名前として認識されません。

分類Dev

Oracle CONNECT BY構文を使用しないSQLの階層的スパンオブコントロールレポート?

分類Dev

kafkaトピックからコンフルエントなelasticsearchへのストリームをどのように取得しますか?

分類Dev

cp-kafka-connectはオープンソースまたは独自のコンポーネントですか?

分類Dev

クラウドコンフルエントなkafkaのインストールがubuntu18で失敗しました

分類Dev

コンフルエントなKSQLでのヌル処理

分類Dev

コンフルエントなkafkaでLensesMQTT Sourceコネクタを使用すると、メッセージフローが断続的になります

分類Dev

コンフルエントなKubernetesヘルムチャートを使用したKafka =スキーマレジストリWakeupException

分類Dev

DockerコンテナーのKafka-connectログ

分類Dev

コンフルエントなkafkaは、Pythonでストリーミング、グループ化、集約を提供しますか?

分類Dev

kafka-connect:コネクタシンクcassandraの分散構成でエラーが発生する

分類Dev

私の優先度つきキュー用のカスタムコンパレータを実装

分類Dev

VSIXでのカスタムコマンドの非同期実装

分類Dev

STRUCTタイプのデフォルト値を設定すると、Kafka Connect APIエラーが発生する

分類Dev

react-redux connect()-edコンテナーは、componentDidMountのようなライフサイクルメソッドを実装できますか?

分類Dev

react-redux connect()-edコンテナーは、componentDidMountのようなライフサイクルメソッドを実装できますか?

分類Dev

クイルブロット用のカスタムエディタを実装する

分類Dev

Kafka Connectでカスタムコンバーターを使用していますか?

分類Dev

カスタムWPFコマンドの実装

Related 関連記事

  1. 1

    Kafka Connect S3動的S3フォルダー構造の作成?

  2. 2

    Kafka Connect実装エラー

  3. 3

    アーティファクトio.confluent:kafka-connect-storage-common-parent:pom:6.0.0-SNAPSHOTをコンフルエント($ {confluent.maven.repo})との間で転送できませんでした

  4. 4

    カスタムgetline()の実装-whileループ内用

  5. 5

    自分のIDをプッシュします。コンフルエントなkafkaはelasticsearchdockerを接続します

  6. 6

    コンフルエントなKafkaでスキーマ名戦略を変更する

  7. 7

    s3バケットのカスタムドメイン?

  8. 8

    カスタムタイムアウト実装のコルーチン

  9. 9

    Kafka Connectログのマスキング?

  10. 10

    Connect-AzAccount:「Connect-AzAccount」という用語は、コマンドレット、関数、スクリプトファイル、または操作可能なプログラムの名前として認識されません。

  11. 11

    Connect-AzAccount:「Connect-AzAccount」という用語は、コマンドレット、関数、スクリプトファイル、または操作可能なプログラムの名前として認識されません。

  12. 12

    Oracle CONNECT BY構文を使用しないSQLの階層的スパンオブコントロールレポート?

  13. 13

    kafkaトピックからコンフルエントなelasticsearchへのストリームをどのように取得しますか?

  14. 14

    cp-kafka-connectはオープンソースまたは独自のコンポーネントですか?

  15. 15

    クラウドコンフルエントなkafkaのインストールがubuntu18で失敗しました

  16. 16

    コンフルエントなKSQLでのヌル処理

  17. 17

    コンフルエントなkafkaでLensesMQTT Sourceコネクタを使用すると、メッセージフローが断続的になります

  18. 18

    コンフルエントなKubernetesヘルムチャートを使用したKafka =スキーマレジストリWakeupException

  19. 19

    DockerコンテナーのKafka-connectログ

  20. 20

    コンフルエントなkafkaは、Pythonでストリーミング、グループ化、集約を提供しますか?

  21. 21

    kafka-connect:コネクタシンクcassandraの分散構成でエラーが発生する

  22. 22

    私の優先度つきキュー用のカスタムコンパレータを実装

  23. 23

    VSIXでのカスタムコマンドの非同期実装

  24. 24

    STRUCTタイプのデフォルト値を設定すると、Kafka Connect APIエラーが発生する

  25. 25

    react-redux connect()-edコンテナーは、componentDidMountのようなライフサイクルメソッドを実装できますか?

  26. 26

    react-redux connect()-edコンテナーは、componentDidMountのようなライフサイクルメソッドを実装できますか?

  27. 27

    クイルブロット用のカスタムエディタを実装する

  28. 28

    Kafka Connectでカスタムコンバーターを使用していますか?

  29. 29

    カスタムWPFコマンドの実装

ホットタグ

アーカイブ