如何为kafka用户使用现有的avro模式?

Shan Lu

我正在使用Debezium SQL Server连接器进行更改数据捕获,并且连接器会自动生成架构并将架构注册到架构注册表中,这意味着我没有Avro架构文件。在这种情况下,如何编写使用此架构读取数据的使用者?我已经看到很多文章使用avro模式文件为使用者读取数据,并且在模式注册表中只有一个用于此有效负载的模式。

如果我在本地创建avro文件,并让我的使用者使用它,那么我必须注册一个具有不同名称的重复模式。

我的问题是如何使用由kafka连接器注册的此模式编写Java使用者API。非常感谢。

这是我的价值模式:

{"subject":"new.dbo.locations-value","version":1,"id":102,"schema":"{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"new.dbo.locations\",\"fields\":[{\"name\":\"before\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Value\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"display_id\",\"type\":\"string\"},{\"name\":\"first_name\",\"type\":\"string\"},{\"name\":\"last_name\",\"type\":\"string\"},{\"name\":\"location_id\",\"type\":\"string\"},{\"name\":\"location_name\",\"type\":\"string\"},{\"name\":\"location_sub_type_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"location_time_zone\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"parent_organization_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"type\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"new.dbo.locations.Value\"}],\"default\":null},{\"name\":\"after\",\"type\":[\"null\",\"Value\"],\"default\":null},{\"name\":\"source\",\"type\":{\"type\":\"record\",\"name\":\"Source\",\"namespace\":\"io.debezium.connector.sqlserver\",\"fields\":[{\"name\":\"version\",\"type\":\"string\"},{\"name\":\"connector\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":\"long\"},{\"name\":\"snapshot\",\"type\":[{\"type\":\"string\",\"connect.version\":1,\"connect.parameters\":{\"allowed\":\"true,last,false\"},\"connect.default\":\"false\",\"connect.name\":\"io.debezium.data.Enum\"},\"null\"],\"default\":\"false\"},{\"name\":\"db\",\"type\":\"string\"},{\"name\":\"schema\",\"type\":\"string\"},{\"name\":\"table\",\"type\":\"string\"},{\"name\":\"change_lsn\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"commit_lsn\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"event_serial_no\",\"type\":[\"null\",\"long\"],\"default\":null}],\"connect.name\":\"io.debezium.connector.sqlserver.Source\"}},{\"name\":\"op\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"transaction\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"ConnectDefault\",\"namespace\":\"io.confluent.connect.avro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"total_order\",\"type\":\"long\"},{\"name\":\"data_collection_order\",\"type\":\"long\"}]}],\"default\":null}],\"connect.name\":\"new.dbo.locations.Envelope\"}"}%
板球运动员

您不需要本地模式文件。您可以使用来使用KafkaConsumer<?, GenericRecord>,这将使解串器下载并为每个消息缓存各自的ID +模式。

这种方法的缺点是您在解析数据时需要小心(非常类似于原始JSON)

如果您需要静态模式和允许严格类型检查的已编译类,请从注册表中下载该文件,网址为: /subjects/:name/versions/latest

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

如何为现有的SVN转换服务器添加新的用户帐户?

来自分类Dev

如何使用Python将数据附加到现有的AVRO文件中

来自分类Dev

如何为新的面板类重复使用现有的布局代码?

来自分类Dev

如何使用现有的用户模型在Django中定义多种用户类型?

来自分类Dev

如何为现有的Java组件添加新功能?

来自分类Dev

如何为现有的全局函数提供TypeScript批注

来自分类Dev

如何为现有的Maven项目创建EAR文件

来自分类Dev

如何为现有的Rails应用构建版本API?

来自分类Dev

如何为现有的风帆项目设置续集

来自分类Dev

如何为现有的ruby项目创建Java接口

来自分类Dev

如何为现有的Java组件添加新功能?

来自分类Dev

如何为现有的div添加父div

来自分类Dev

如何为现有的postgres数据库创建用户(读写和只读)和管理员角色?

来自分类Dev

如何使ServiceStack与现有的MVC / Service / Repository模式一起使用

来自分类Dev

如何使用$ routeProvider在AngularJS 1.x的ui模式下打开现有的控制器/视图

来自分类Dev

如何使用现有的已登录Chrome用户配置文件正确设置VS Code以在Chrome中调试?

来自分类Dev

如何在Dockerfile中使用现有的Docker Volume

来自分类Dev

如何使用现有的fork更新我的github库?

来自分类Dev

如何使用Eclipse打开现有的C ++项目?

来自分类Dev

如何快速使用现有的SQLite数据库?

来自分类Dev

随着SBT的发展,如何使用现有的Intellij项目?

来自分类Dev

如何使用现有的customerId处理Braintree付款

来自分类Dev

如何在Eclipse中使用现有的.target文件?

来自分类Dev

如何使用JMX监视现有的Java类?

来自分类Dev

如何对现有的API使用Amazon API网关?

来自分类Dev

如何使用VueJS增强现有的Laravel项目?

来自分类Dev

如何使用计算目标获取现有的AKS

来自分类Dev

如何使用AWS CDK查找现有的ApiGateway

来自分类Dev

如何使用AWS CDK查找现有的ApiGateway

Related 相关文章

  1. 1

    如何为现有的SVN转换服务器添加新的用户帐户?

  2. 2

    如何使用Python将数据附加到现有的AVRO文件中

  3. 3

    如何为新的面板类重复使用现有的布局代码?

  4. 4

    如何使用现有的用户模型在Django中定义多种用户类型?

  5. 5

    如何为现有的Java组件添加新功能?

  6. 6

    如何为现有的全局函数提供TypeScript批注

  7. 7

    如何为现有的Maven项目创建EAR文件

  8. 8

    如何为现有的Rails应用构建版本API?

  9. 9

    如何为现有的风帆项目设置续集

  10. 10

    如何为现有的ruby项目创建Java接口

  11. 11

    如何为现有的Java组件添加新功能?

  12. 12

    如何为现有的div添加父div

  13. 13

    如何为现有的postgres数据库创建用户(读写和只读)和管理员角色?

  14. 14

    如何使ServiceStack与现有的MVC / Service / Repository模式一起使用

  15. 15

    如何使用$ routeProvider在AngularJS 1.x的ui模式下打开现有的控制器/视图

  16. 16

    如何使用现有的已登录Chrome用户配置文件正确设置VS Code以在Chrome中调试?

  17. 17

    如何在Dockerfile中使用现有的Docker Volume

  18. 18

    如何使用现有的fork更新我的github库?

  19. 19

    如何使用Eclipse打开现有的C ++项目?

  20. 20

    如何快速使用现有的SQLite数据库?

  21. 21

    随着SBT的发展,如何使用现有的Intellij项目?

  22. 22

    如何使用现有的customerId处理Braintree付款

  23. 23

    如何在Eclipse中使用现有的.target文件?

  24. 24

    如何使用JMX监视现有的Java类?

  25. 25

    如何对现有的API使用Amazon API网关?

  26. 26

    如何使用VueJS增强现有的Laravel项目?

  27. 27

    如何使用计算目标获取现有的AKS

  28. 28

    如何使用AWS CDK查找现有的ApiGateway

  29. 29

    如何使用AWS CDK查找现有的ApiGateway

热门标签

归档