我在 Kafka 中有一个日志压缩主题,我正在 KTable 中读取它并创建一个商店。每当我更新键控消息时,我都看不到应用程序运行时的最新状态。但是当我重新启动流应用程序时,我能够看到更新的状态。如何在流应用程序运行时获取更新的状态而无需重新启动。
这是我用来迭代状态值的代码(在主函数中)。
final String queryableStore = metaTable.queryableStoreName();
ReadOnlyKeyValueStore<String,String> metaTableView = null;
try {
metaTableView = waitUntilStoreIsQueryable(queryableStore,QueryableStoreTypes.keyValueStore(),streams);
} catch (InterruptedException e) {
e.printStackTrace();
}
metaTableView.all().forEachRemaining(row -> logger.info("Key: "+row.key+" Value: "+row.value));
不确定你是如何运行代码的。但是,如果我理解正确,您希望logger.info
打印每个更新吗?如果是这种情况,那么这将不起作用,因为.all()
迭代 KTable 一次然后终止。请注意,您在代码示例中使用的“交互式查询”是为一次性查找而设计的。您需要在您自己的代码中重新运行查询——当然,您每次都会获得所有记录。
如果您想持续获取 KTable 更新(并且只有更新),您需要直接使用 KTable。例如,你可以做
KTable table = builder.table(...);
table.toStream().foreach(...);
对 KTable 的每次更新都会产生一个由 处理的输出记录foreach()
。注意记录缓存;您可能希望禁用缓存以获取所有更新,否则,某些更新可能无法转发,因为它们会受到缓存的影响。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句