Kafka Streams 应用程序在 kafka 服务器上打开过多文件

斯塔夫·萨德

我一直在开发基于 java kafka-streams API 的应用程序,其目标是处理来自一个 kafka 主题的数据流,并将其生成到另一个主题中。

看起来,每当我开始使用 kafka-streams 应用程序生成消息时,文件句柄只会在我正在使用的 kafka 代理上保持打开状态,并且它们永远不会关闭,这意味着最终 kafka 服务器最终会打开太多文件,并且kafka 和 zookeeper 守护进程崩溃。

我正在kafka-streams-1.0.1为 Java使用API jar,并在 JDK 11 上运行。kafka 集群是 Kafka 版本 1.0.0。

我的应用程序的配置包括以下 kafka 生产者配置:

  • batch.size:设置为 100,000 条消息。
  • linger.ms: 设置为 1,000 毫秒。
  • buffer.memory:设置为相当于 5 兆字节的字节。

流处理本身非常简单,由以下组成:

stream.map((k,v) -> handle(k,v)).filter((k,v) -> v != null).to(outgoingTopic);

我将不胜感激你们可能提出的任何建议。

斯塔夫·萨德

如果消息可能导致时间戳乱序,那么覆盖 Kafka 流时间戳提取器似乎不是一个好主意。恢复到默认的时间戳提取器后,我已经全部修复了

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

停止Kafka Streams应用

来自分类Dev

Scala Jar文件对于Kafka Streams应用程序无法正常工作

来自分类常见问题

Kafka Streams应用程序无休止的重新平衡

来自分类Dev

终止使用Kafka-Streams和MongoDB的Spring Boot应用程序

来自分类Dev

Kafka Streams State Stores是否适合处理大量密钥和数据的有状态应用程序?

来自分类Dev

Kafka Streams应用程序是否会提交用于填充全局KTable的主题的提示?

来自分类Dev

使用kafka-streams活页夹测试Spring Cloud Stream应用程序

来自分类Dev

在Kafka Streams应用程序中关闭或不关闭RocksDB Cache和WriteBufferManager

来自分类Dev

在 Kafka Streams 应用程序中保持本地状态存储更新

来自分类Dev

如何在 Spring Cloud Stream Kafka Streams 应用程序中使用 StateStoreBuilder 添加 StateStore

来自分类Dev

Kafka KStream应用程序-临时文件清理

来自分类Dev

Kafka Streams KGroupedTable恢复

来自分类Dev

Kafka Streams JoinWindow 的数据

来自分类Dev

Kafka Streams如何获取kafka标头

来自分类Dev

kafka-streams警告kafka连接故障

来自分类Dev

将消息从Kafka路由到连接到应用程序服务器集群的Web套接字客户端

来自分类Dev

Kafka Streams:动态配置RocksDb

来自分类Dev

Kafka Streams - 旧状态聚合

来自分类Dev

Kafka Streams 检测丢失的记录

来自分类Dev

Kafka Streams transform() 状态存储

来自分类Dev

在Apache Kafka Streams中的特定分区上聚合

来自分类Dev

在Kafka Streams中的聚合器中访问TimeWindow属性

来自分类Dev

Kafka Streams - 任务/分区/处理器关系

来自分类Dev

对导致服务器因“文件打开过多”而挂起的Java程序进行故障诊断

来自分类Dev

Kafka Streams 使用的 RocksDB 文件名含义

来自分类Dev

如何忽略从同一主题读取和写入不同事件类型的 Kafka Streams 应用程序中的某些类型的消息

来自分类Dev

运行与Flask应用程序交互的Kafka使用者

来自分类Dev

应用程序日志保存在kafka代理上的哪里?

来自分类Dev

Kafka 流字数统计应用程序

Related 相关文章

  1. 1

    停止Kafka Streams应用

  2. 2

    Scala Jar文件对于Kafka Streams应用程序无法正常工作

  3. 3

    Kafka Streams应用程序无休止的重新平衡

  4. 4

    终止使用Kafka-Streams和MongoDB的Spring Boot应用程序

  5. 5

    Kafka Streams State Stores是否适合处理大量密钥和数据的有状态应用程序?

  6. 6

    Kafka Streams应用程序是否会提交用于填充全局KTable的主题的提示?

  7. 7

    使用kafka-streams活页夹测试Spring Cloud Stream应用程序

  8. 8

    在Kafka Streams应用程序中关闭或不关闭RocksDB Cache和WriteBufferManager

  9. 9

    在 Kafka Streams 应用程序中保持本地状态存储更新

  10. 10

    如何在 Spring Cloud Stream Kafka Streams 应用程序中使用 StateStoreBuilder 添加 StateStore

  11. 11

    Kafka KStream应用程序-临时文件清理

  12. 12

    Kafka Streams KGroupedTable恢复

  13. 13

    Kafka Streams JoinWindow 的数据

  14. 14

    Kafka Streams如何获取kafka标头

  15. 15

    kafka-streams警告kafka连接故障

  16. 16

    将消息从Kafka路由到连接到应用程序服务器集群的Web套接字客户端

  17. 17

    Kafka Streams:动态配置RocksDb

  18. 18

    Kafka Streams - 旧状态聚合

  19. 19

    Kafka Streams 检测丢失的记录

  20. 20

    Kafka Streams transform() 状态存储

  21. 21

    在Apache Kafka Streams中的特定分区上聚合

  22. 22

    在Kafka Streams中的聚合器中访问TimeWindow属性

  23. 23

    Kafka Streams - 任务/分区/处理器关系

  24. 24

    对导致服务器因“文件打开过多”而挂起的Java程序进行故障诊断

  25. 25

    Kafka Streams 使用的 RocksDB 文件名含义

  26. 26

    如何忽略从同一主题读取和写入不同事件类型的 Kafka Streams 应用程序中的某些类型的消息

  27. 27

    运行与Flask应用程序交互的Kafka使用者

  28. 28

    应用程序日志保存在kafka代理上的哪里?

  29. 29

    Kafka 流字数统计应用程序

热门标签

归档