我一直在开发基于 java kafka-streams API 的应用程序,其目标是处理来自一个 kafka 主题的数据流,并将其生成到另一个主题中。
看起来,每当我开始使用 kafka-streams 应用程序生成消息时,文件句柄只会在我正在使用的 kafka 代理上保持打开状态,并且它们永远不会关闭,这意味着最终 kafka 服务器最终会打开太多文件,并且kafka 和 zookeeper 守护进程崩溃。
我正在kafka-streams-1.0.1
为 Java使用API jar,并在 JDK 11 上运行。kafka 集群是 Kafka 版本 1.0.0。
我的应用程序的配置包括以下 kafka 生产者配置:
batch.size
:设置为 100,000 条消息。linger.ms
: 设置为 1,000 毫秒。buffer.memory
:设置为相当于 5 兆字节的字节。流处理本身非常简单,由以下组成:
stream.map((k,v) -> handle(k,v)).filter((k,v) -> v != null).to(outgoingTopic);
我将不胜感激你们可能提出的任何建议。
如果消息可能导致时间戳乱序,那么覆盖 Kafka 流时间戳提取器似乎不是一个好主意。恢复到默认的时间戳提取器后,我已经全部修复了
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句