在 Kafka Streams 应用程序中保持本地状态存储更新

大师107

我在 Kafka 中有一个日志压缩主题,我正在 KTable 中读取它并创建一个商店。每当我更新键控消息时,我都看不到应用程序运行时的最新状态。但是当我重新启动流应用程序时,我能够看到更新的状态。如何在流应用程序运行时获取更新的状态而无需重新启动。

这是我用来迭代状态值的代码(在主函数中)。

final String queryableStore = metaTable.queryableStoreName();

    ReadOnlyKeyValueStore<String,String> metaTableView = null;
    try {
        metaTableView = waitUntilStoreIsQueryable(queryableStore,QueryableStoreTypes.keyValueStore(),streams);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    metaTableView.all().forEachRemaining(row -> logger.info("Key: "+row.key+" Value: "+row.value));
马蒂亚斯·J·萨克斯

不确定你是如何运行代码的。但是,如果我理解正确,您希望logger.info打印每个更新吗?如果是这种情况,那么这将不起作用,因为.all()迭代 KTable 一次然后终止。请注意,您在代码示例中使用的“交互式查询”是为一次性查找而设计的。您需要在您自己的代码中重新运行查询——当然,您每次都会获得所有记录。

如果您想持续获取 KTable 更新(并且只有更新),您需要直接使用 KTable。例如,你可以做

KTable table = builder.table(...);
table.toStream().foreach(...);

对 KTable 的每次更新都会产生一个由 处理的输出记录foreach()注意记录缓存;您可能希望禁用缓存以获取所有更新,否则,某些更新可能无法转发,因为它们会受到缓存的影响。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

Kafka Streams transform() 状态存储

来自分类Dev

停止Kafka Streams应用

来自分类Dev

Kafka Streams - 旧状态聚合

来自分类Dev

Kafka Streams State Stores是否适合处理大量密钥和数据的有状态应用程序?

来自分类Dev

Kafka Streams 应用程序在 kafka 服务器上打开过多文件

来自分类Dev

为什么我必须使用Kafka Streams配置状态存储

来自分类Dev

Kafka Streams - 减少大型状态存储的内存占用

来自分类Dev

Kafka Streams KGroupedTable恢复

来自分类Dev

Kafka Streams JoinWindow 的数据

来自分类常见问题

Kafka Streams应用程序无休止的重新平衡

来自分类Dev

终止使用Kafka-Streams和MongoDB的Spring Boot应用程序

来自分类Dev

Scala Jar文件对于Kafka Streams应用程序无法正常工作

来自分类Dev

Kafka Streams应用程序是否会提交用于填充全局KTable的主题的提示?

来自分类Dev

使用kafka-streams活页夹测试Spring Cloud Stream应用程序

来自分类Dev

在Kafka Streams应用程序中关闭或不关闭RocksDB Cache和WriteBufferManager

来自分类Dev

如何在 Spring Cloud Stream Kafka Streams 应用程序中使用 StateStoreBuilder 添加 StateStore

来自分类Dev

带窗口的Kafka Streams拓扑不会触发状态更改

来自分类Dev

Kafka Streams内部主题的更改复制因子会影响kafka Streams吗?流媒体会处于错误状态吗?

来自分类Dev

在Kafka Streams中重建状态存储是否会将重复的记录传播到下游主题?

来自分类Dev

如何在多个磁盘上分发Kafka-Streams状态存储

来自分类Dev

Kafka Streams-添加新的源流后使用现有状态存储

来自分类Dev

在 Kafka Streams 中查询全局状态存储会引发 Null 异常

来自分类Dev

Kafka Streams:使用 at_least_once 时对状态存储的保存顺序有任何保证吗?

来自分类Dev

Kafka Streams如何获取kafka标头

来自分类Dev

kafka-streams警告kafka连接故障

来自分类Dev

Kafka Streams:动态配置RocksDb

来自分类Dev

Kafka Streams 检测丢失的记录

来自分类Dev

Kafka Streams 持久存储错误:状态存储,可能已迁移到另一个实例

来自分类Dev

Kafka Streams:由于恢复期间更改日志状态而无法重新平衡

Related 相关文章

  1. 1

    Kafka Streams transform() 状态存储

  2. 2

    停止Kafka Streams应用

  3. 3

    Kafka Streams - 旧状态聚合

  4. 4

    Kafka Streams State Stores是否适合处理大量密钥和数据的有状态应用程序?

  5. 5

    Kafka Streams 应用程序在 kafka 服务器上打开过多文件

  6. 6

    为什么我必须使用Kafka Streams配置状态存储

  7. 7

    Kafka Streams - 减少大型状态存储的内存占用

  8. 8

    Kafka Streams KGroupedTable恢复

  9. 9

    Kafka Streams JoinWindow 的数据

  10. 10

    Kafka Streams应用程序无休止的重新平衡

  11. 11

    终止使用Kafka-Streams和MongoDB的Spring Boot应用程序

  12. 12

    Scala Jar文件对于Kafka Streams应用程序无法正常工作

  13. 13

    Kafka Streams应用程序是否会提交用于填充全局KTable的主题的提示?

  14. 14

    使用kafka-streams活页夹测试Spring Cloud Stream应用程序

  15. 15

    在Kafka Streams应用程序中关闭或不关闭RocksDB Cache和WriteBufferManager

  16. 16

    如何在 Spring Cloud Stream Kafka Streams 应用程序中使用 StateStoreBuilder 添加 StateStore

  17. 17

    带窗口的Kafka Streams拓扑不会触发状态更改

  18. 18

    Kafka Streams内部主题的更改复制因子会影响kafka Streams吗?流媒体会处于错误状态吗?

  19. 19

    在Kafka Streams中重建状态存储是否会将重复的记录传播到下游主题?

  20. 20

    如何在多个磁盘上分发Kafka-Streams状态存储

  21. 21

    Kafka Streams-添加新的源流后使用现有状态存储

  22. 22

    在 Kafka Streams 中查询全局状态存储会引发 Null 异常

  23. 23

    Kafka Streams:使用 at_least_once 时对状态存储的保存顺序有任何保证吗?

  24. 24

    Kafka Streams如何获取kafka标头

  25. 25

    kafka-streams警告kafka连接故障

  26. 26

    Kafka Streams:动态配置RocksDb

  27. 27

    Kafka Streams 检测丢失的记录

  28. 28

    Kafka Streams 持久存储错误:状态存储,可能已迁移到另一个实例

  29. 29

    Kafka Streams:由于恢复期间更改日志状态而无法重新平衡

热门标签

归档