我正在使用Debezium SQL Server连接器进行更改数据捕获,并且连接器会自动生成架构并将架构注册到架构注册表中,这意味着我没有Avro架构文件。在这种情况下,如何编写使用此架构读取数据的使用者?我已经看到很多文章使用avro模式文件为使用者读取数据,并且在模式注册表中只有一个用于此有效负载的模式。
如果我在本地创建avro文件,并让我的使用者使用它,那么我必须注册一个具有不同名称的重复模式。
我的问题是如何使用由kafka连接器注册的此模式编写Java使用者API。非常感谢。
这是我的价值模式:
{"subject":"new.dbo.locations-value","version":1,"id":102,"schema":"{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"new.dbo.locations\",\"fields\":[{\"name\":\"before\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Value\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"display_id\",\"type\":\"string\"},{\"name\":\"first_name\",\"type\":\"string\"},{\"name\":\"last_name\",\"type\":\"string\"},{\"name\":\"location_id\",\"type\":\"string\"},{\"name\":\"location_name\",\"type\":\"string\"},{\"name\":\"location_sub_type_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"location_time_zone\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"parent_organization_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"type\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"new.dbo.locations.Value\"}],\"default\":null},{\"name\":\"after\",\"type\":[\"null\",\"Value\"],\"default\":null},{\"name\":\"source\",\"type\":{\"type\":\"record\",\"name\":\"Source\",\"namespace\":\"io.debezium.connector.sqlserver\",\"fields\":[{\"name\":\"version\",\"type\":\"string\"},{\"name\":\"connector\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":\"long\"},{\"name\":\"snapshot\",\"type\":[{\"type\":\"string\",\"connect.version\":1,\"connect.parameters\":{\"allowed\":\"true,last,false\"},\"connect.default\":\"false\",\"connect.name\":\"io.debezium.data.Enum\"},\"null\"],\"default\":\"false\"},{\"name\":\"db\",\"type\":\"string\"},{\"name\":\"schema\",\"type\":\"string\"},{\"name\":\"table\",\"type\":\"string\"},{\"name\":\"change_lsn\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"commit_lsn\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"event_serial_no\",\"type\":[\"null\",\"long\"],\"default\":null}],\"connect.name\":\"io.debezium.connector.sqlserver.Source\"}},{\"name\":\"op\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"transaction\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"ConnectDefault\",\"namespace\":\"io.confluent.connect.avro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"total_order\",\"type\":\"long\"},{\"name\":\"data_collection_order\",\"type\":\"long\"}]}],\"default\":null}],\"connect.name\":\"new.dbo.locations.Envelope\"}"}%
您不需要本地模式文件。您可以使用来使用KafkaConsumer<?, GenericRecord>
,这将使解串器下载并为每个消息缓存各自的ID +模式。
这种方法的缺点是您在解析数据时需要小心(非常类似于原始JSON)
如果您需要静态模式和允许严格类型检查的已编译类,请从注册表中下载该文件,网址为: /subjects/:name/versions/latest
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句