How to utilize existing avro schema for my kafka consumer?

Shan Lu

I am using Debezium SQL Server Connector to do change data capture, and the connector automatically generates the schema and registers the schema in schema registry, which means I don't have the avro schema file. In this case, how can I write a consumer reading data with this schema? I have seen a lot of articles using the avro schema file to read the data for consumers, and there will only be one schema for this payload in the schema registry.

If I create an avro file locally and let my consumer use it, then I have to register a duplicate schema with a different name.

My question is how to write a Java consumer API using this schema registered by a kafka connector. Thank you so much.

Here is my value schema:

{"subject":"new.dbo.locations-value","version":1,"id":102,"schema":"{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"new.dbo.locations\",\"fields\":[{\"name\":\"before\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Value\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"display_id\",\"type\":\"string\"},{\"name\":\"first_name\",\"type\":\"string\"},{\"name\":\"last_name\",\"type\":\"string\"},{\"name\":\"location_id\",\"type\":\"string\"},{\"name\":\"location_name\",\"type\":\"string\"},{\"name\":\"location_sub_type_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"location_time_zone\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"parent_organization_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"type\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"new.dbo.locations.Value\"}],\"default\":null},{\"name\":\"after\",\"type\":[\"null\",\"Value\"],\"default\":null},{\"name\":\"source\",\"type\":{\"type\":\"record\",\"name\":\"Source\",\"namespace\":\"io.debezium.connector.sqlserver\",\"fields\":[{\"name\":\"version\",\"type\":\"string\"},{\"name\":\"connector\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":\"long\"},{\"name\":\"snapshot\",\"type\":[{\"type\":\"string\",\"connect.version\":1,\"connect.parameters\":{\"allowed\":\"true,last,false\"},\"connect.default\":\"false\",\"connect.name\":\"io.debezium.data.Enum\"},\"null\"],\"default\":\"false\"},{\"name\":\"db\",\"type\":\"string\"},{\"name\":\"schema\",\"type\":\"string\"},{\"name\":\"table\",\"type\":\"string\"},{\"name\":\"change_lsn\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"commit_lsn\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"event_serial_no\",\"type\":[\"null\",\"long\"],\"default\":null}],\"connect.name\":\"io.debezium.connector.sqlserver.Source\"}},{\"name\":\"op\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"transaction\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"ConnectDefault\",\"namespace\":\"io.confluent.connect.avro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"total_order\",\"type\":\"long\"},{\"name\":\"data_collection_order\",\"type\":\"long\"}]}],\"default\":null}],\"connect.name\":\"new.dbo.locations.Envelope\"}"}%
OneCricketeer

You don't need a local schema file. You can consume using KafkaConsumer<?, GenericRecord>, which will let the deserializer download and cache the respective ID+schema for each message.

Downside of this approach is that you need to be careful about parsing the data (much like raw JSON)

If you need a static schema and the compiled class which will allow for strict type-checking, then download it from the registry at /subjects/:name/versions/latest

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

Kafka Avro Schema evolution

分類Dev

Kafka Avro Consumer with Decoderの問題

分類Dev

Kafka + Kubernetes + Helm + `/ usr / bin / kafka-avro-console-consumer`?

分類Dev

How can I test a Spring Cloud Stream Kafka Streams application that uses Avro and the Confluent Schema Registry?

分類Dev

Unknown magic byte with kafka-avro-console-consumer

分類Dev

How do I implement Kafka Consumer in Scala

分類Dev

How to get the processing kafka topic name dynamically in Flink Kafka Consumer?

分類Dev

How can I set a logicalType in a spark-avro 2.4 schema?

分類Dev

How to ensure constant Avro schema generation and avoid the 'Too many schema objects created for x' exception?

分類Dev

How to ensure constant Avro schema generation and avoid the 'Too many schema objects created for x' exception?

分類Dev

How to detect if kafka broker is not available from consumer in java?

分類Dev

How to have multiple kafka consumer groups in application properties

分類Dev

How to create Kafka user and consumer group for ACLs in a running cluster?

分類Dev

Kafka Consumer with JAVA

分類Dev

Spring boot Kafka consumer

分類Dev

How to fix "java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord" in Spark Streaming Kafka Consumer?

分類Dev

How do I get my QML program to utilize Unity7 notifications? Is there an API I can use?

分類Dev

How can I utilize AWS::Serverless::LayerVersion in order to use external libraries in my AWS Lambda functions

分類Dev

How can I utilize AWS::Serverless::LayerVersion in order to use external libraries in my AWS Lambda functions

分類Dev

Confluentのkafka-avro-console-consumerはログファイルをどこに書き込みますか?

分類Dev

How to pass schema to create a new Dataframe from existing Dataframe?

分類Dev

kafka-avro-console-consumerを使用してavroメッセージを読み取ることができません。SerializationException:不明なマジックバイト

分類Dev

How to utilize bootstrap toggle with flask

分類Dev

Kafka consumer manual commit offset

分類Dev

Kafka Stream: Consumer commit frequency

分類Dev

Partition specific flink kafka consumer

分類Dev

Kafka elixir consumer keeps crashing

分類Dev

kafka consumer code is not running completely

分類Dev

Kafka consumer hangs on poll when kafka is down

Related 関連記事

  1. 1

    Kafka Avro Schema evolution

  2. 2

    Kafka Avro Consumer with Decoderの問題

  3. 3

    Kafka + Kubernetes + Helm + `/ usr / bin / kafka-avro-console-consumer`?

  4. 4

    How can I test a Spring Cloud Stream Kafka Streams application that uses Avro and the Confluent Schema Registry?

  5. 5

    Unknown magic byte with kafka-avro-console-consumer

  6. 6

    How do I implement Kafka Consumer in Scala

  7. 7

    How to get the processing kafka topic name dynamically in Flink Kafka Consumer?

  8. 8

    How can I set a logicalType in a spark-avro 2.4 schema?

  9. 9

    How to ensure constant Avro schema generation and avoid the 'Too many schema objects created for x' exception?

  10. 10

    How to ensure constant Avro schema generation and avoid the 'Too many schema objects created for x' exception?

  11. 11

    How to detect if kafka broker is not available from consumer in java?

  12. 12

    How to have multiple kafka consumer groups in application properties

  13. 13

    How to create Kafka user and consumer group for ACLs in a running cluster?

  14. 14

    Kafka Consumer with JAVA

  15. 15

    Spring boot Kafka consumer

  16. 16

    How to fix "java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord" in Spark Streaming Kafka Consumer?

  17. 17

    How do I get my QML program to utilize Unity7 notifications? Is there an API I can use?

  18. 18

    How can I utilize AWS::Serverless::LayerVersion in order to use external libraries in my AWS Lambda functions

  19. 19

    How can I utilize AWS::Serverless::LayerVersion in order to use external libraries in my AWS Lambda functions

  20. 20

    Confluentのkafka-avro-console-consumerはログファイルをどこに書き込みますか?

  21. 21

    How to pass schema to create a new Dataframe from existing Dataframe?

  22. 22

    kafka-avro-console-consumerを使用してavroメッセージを読み取ることができません。SerializationException:不明なマジックバイト

  23. 23

    How to utilize bootstrap toggle with flask

  24. 24

    Kafka consumer manual commit offset

  25. 25

    Kafka Stream: Consumer commit frequency

  26. 26

    Partition specific flink kafka consumer

  27. 27

    Kafka elixir consumer keeps crashing

  28. 28

    kafka consumer code is not running completely

  29. 29

    Kafka consumer hangs on poll when kafka is down

ホットタグ

アーカイブ