无法从 debezium-postgres 的 kafka-stream 读取 kafka-stream 数据

阿比纳布·坎拉尔

我使用以下命令启动了 kafka 连接器:

./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-postgres/connect-postgres.properties 

connect-avro-standalone.properties 中的序列化道具是:

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

我创建了一个 java 后端,它监听这个 kafka 流主题,并且它能够通过每次添加/更新/删除从 postgres 获取数据。
但是数据是以某种未知的编码格式传入的,这就是为什么我无法正确读取数据的原因。
这是相关的代码片段:

properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.ByteArray().getClass().getName());

StreamsBuilder streamsBuilder = new StreamsBuilder();

final Serde<String> stringSerde = Serdes.String();
final Serde<byte[]> byteSerde = Serdes.ByteArray();

streamsBuilder.stream(Pattern.compile(getTopic()), Consumed.with(stringSerde, byteSerde))
.mapValues(data -> {
  System.out.println("->"+new String(data));
  return data;
});

我对我需要改变的地方和内容感到困惑;在 avro 连接器道具中或在 java 端代码中

罗宾·莫法特

此处的 Kafka Connect 配置意味着 Kafka 主题上的消息将被 Avro 序列化:

value.converter=io.confluent.connect.avro.AvroConverter

这意味着您需要在 Streams 应用程序中使用 Avro 进行反序列化。有关更多详细信息,请参见此处:https : //docs.confluent.io/current/streams/developer-guide/datatypes.html#avro

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

无法使用Spark读取kafka主题数据

来自分类Dev

StreamBuilder无法按预期从Stream读取数据

来自分类Dev

spring.cloud.stream.kafka.binder.headers无法正常工作

来自分类Dev

Kafka和TextSocket Stream中的Spark Streaming数据分发

来自分类Dev

Spring Cloud Stream 向 Kafka 主题发送数据失败

来自分类Dev

flink从kafka读取数据

来自分类Dev

Postgres Debezium CDC不会发布对Kafka的更改

来自分类Dev

带 kafka 的 Debezium 还是仅嵌入的 Debezium?

来自分类Dev

在Kafka Stream中执行异步转换

来自分类Dev

Spark Stream Kafka和Hbase Config

来自分类Dev

Spring Cloud Stream Kafka记录太大

来自分类Dev

Kafka Stream的输出速率与窗口不同

来自分类Dev

Spark Stream Kafka和Hbase Config

来自分类Dev

Spring Cloud Stream Kafka 错误通道

来自分类Dev

Kafka Stream 自定义状态存储

来自分类Dev

哪个版本的 Kafka Stream 会更高效?

来自分类Dev

Spring Kafka/Spring Cloud Stream 如何保证涉及数据库和 Kafka 的事务性/原子性?

来自分类Dev

如果kafka connect的debezium源连接器无法对kafka产生影响怎么办

来自分类Dev

Kafka Connect JDBC与Debezium CDC

来自分类Dev

使用Spring Cloud Stream Kafka Binder时无法设置groupId和clientId

来自分类Dev

Kafka Stream:应用程序重新启动时的Kafka Windowed Stream行为

来自分类Dev

Spring Cloud Stream Kafka Stream 与本机 Kafka Stream 应用程序和生产者之间的 Avro 消息不兼容

来自分类Dev

Android-Stream无法正确读取数组

来自分类Dev

ImageMagick 的 Stream 无法读取 TIFF64?

来自分类Dev

Storm从Java中的Kafka读取数据

来自分类Dev

如何从flink访问/读取kafka主题数据?

来自分类常见问题

Spring Cloud Stream在Kafka Streams流程中出现Serde错误

来自分类Dev

将Kafka Stream输入输出到控制台?

来自分类Dev

spring-cloud-stream-kafka错误处理

Related 相关文章

  1. 1

    无法使用Spark读取kafka主题数据

  2. 2

    StreamBuilder无法按预期从Stream读取数据

  3. 3

    spring.cloud.stream.kafka.binder.headers无法正常工作

  4. 4

    Kafka和TextSocket Stream中的Spark Streaming数据分发

  5. 5

    Spring Cloud Stream 向 Kafka 主题发送数据失败

  6. 6

    flink从kafka读取数据

  7. 7

    Postgres Debezium CDC不会发布对Kafka的更改

  8. 8

    带 kafka 的 Debezium 还是仅嵌入的 Debezium?

  9. 9

    在Kafka Stream中执行异步转换

  10. 10

    Spark Stream Kafka和Hbase Config

  11. 11

    Spring Cloud Stream Kafka记录太大

  12. 12

    Kafka Stream的输出速率与窗口不同

  13. 13

    Spark Stream Kafka和Hbase Config

  14. 14

    Spring Cloud Stream Kafka 错误通道

  15. 15

    Kafka Stream 自定义状态存储

  16. 16

    哪个版本的 Kafka Stream 会更高效?

  17. 17

    Spring Kafka/Spring Cloud Stream 如何保证涉及数据库和 Kafka 的事务性/原子性?

  18. 18

    如果kafka connect的debezium源连接器无法对kafka产生影响怎么办

  19. 19

    Kafka Connect JDBC与Debezium CDC

  20. 20

    使用Spring Cloud Stream Kafka Binder时无法设置groupId和clientId

  21. 21

    Kafka Stream:应用程序重新启动时的Kafka Windowed Stream行为

  22. 22

    Spring Cloud Stream Kafka Stream 与本机 Kafka Stream 应用程序和生产者之间的 Avro 消息不兼容

  23. 23

    Android-Stream无法正确读取数组

  24. 24

    ImageMagick 的 Stream 无法读取 TIFF64?

  25. 25

    Storm从Java中的Kafka读取数据

  26. 26

    如何从flink访问/读取kafka主题数据?

  27. 27

    Spring Cloud Stream在Kafka Streams流程中出现Serde错误

  28. 28

    将Kafka Stream输入输出到控制台?

  29. 29

    spring-cloud-stream-kafka错误处理

热门标签

归档