带窗口的Kafka Streams拓扑不会触发状态更改

谢尔盖·谢尔巴科夫(Sergey Shcherbakov)

我正在构建以下Kafka Streams拓扑(伪代码):

gK = builder.stream().gropuByKey();
g1 = gK.windowedBy(TimeWindows.of("PT1H")).reduce().mapValues().toStream().mapValues().selectKey();
g2 = gK.reduce().mapValues();
g1.leftJoin(g2).to();

如果您注意到了,这是一种菱形拓扑,从单个输入主题开始,以单个输出主题结束,消息流经两个并行流,最终在最后合并在一起。一种流适用(滚动?)加窗,另一种不适用。流的两个部分都在同一个键上工作(除了窗口中间引入的WindowedKey外)。

我的消息的时间戳是事件时间。也就是说,它们是通过我的自定义配置TimestampExtractor实现从消息正文中选择的我的邮件中的实际时间戳是过去的几年。

在我的单元测试中,使用几个输入/输出消息以及在运行时环境(使用实际的Kafka)中,所有这些功能乍一看都很好。

当消息数量开始很大(例如40K)时,似乎出现了问题。

我失败的情况如下:

  1. 具有相同键的约40K条记录首先被上传到输入主题中

  2. 正如预期的那样,大约有4万个更新来自输出主题

  3. 与步骤1)相同但不同的另一个〜40K记录被上传到输入主题中

  4. 输出主题中仅约有100个更新,而不是预期的约40K新更新。大约100个更新没有什么特别的,它们的内容似乎是正确的,但仅在某些时间范围内。对于其他时间窗口,即使流逻辑和输入数据应明确生成40K记录,也没有更新。实际上,当我在步骤1)和3)中交换数据集时,我遇到的情况完全相同,第二个数据集有约40K个更新,而第一个数据集有约100个更新。

我可以在TopologyTestDriver本地使用单元测试轻松重现此问题(但只能在大量输入记录上使用)。

在测试中,我尝试使用禁用缓存StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG不幸的是,这没有任何区别。

更新

我同时尝试了reduce()调用和aggregate()调用。在两种情况下问题仍然存在。

我还需要注意的是,StreamsConfig.TOPOLOGY_OPTIMIZATION设置为StreamsConfig.OPTIMIZE和不设置时,至少在第一次之前在调试器中的mapValues()处理函数在前面的reduce()(或gregation())处理函数之前被调用没想到

不幸的是,尝试了join()和leftJoin()。在调试器中,数据的第二部分根本不会触发“左”流程中的reduce()处理程序,但是会触发“右”流程中的reduce()处理程序。

在我的配置中,如果两个数据集中的数目或记录中的每个数目均为100,问题就不会显现出来,我将得到200条输出消息。当我在每个数据集中将数字增加到200时,我收到的预期消息少于400。因此,目前看来,诸如“旧”窗口之类的东西将被删除,而那些旧窗口的新记录将被流忽略。有可以设置的窗口保留设置,但是使用我使用的默认值,我希望窗口可以保留其状态并保持活动至少12个小时(这远远超出了单元测试的运行时间)。

尝试使用以下窗口存储配置修改左减速机:

Materialized.as(
    Stores.inMemoryWindowStore(
        "rollup-left-reduce",
        Duration.ofDays(5 * 365),
        Duration.ofHours(1), false)
)

结果仍然没有差异。

即使只有单个“左”流而没有“右”流,也没有join(),相同的问题仍然存在。看来问题出在我设置的窗口保留设置中。我的输入记录的时间戳(事件时间)跨越2年。第二个数据集再次从2年开始。Kafka Streams中的此位置可确保第二个数据集记录被忽略:

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java#L125

Kafka Streams版本是2.4.0。还使用Confluent依赖版本5.4.0。

我的问题是

  • 发生这种行为的原因可能是什么?
  • 我是否错过了流拓扑中的任何内容?
  • 这样的拓扑是否可以正常工作?
谢尔盖·谢尔巴科夫(Sergey Shcherbakov)

经过一段时间的调试后,我找到了问题的原因。

我的输入数据集包含带有2年时间戳记的记录。我正在加载第一个数据集,并从输入数据集中将流的“观察”时间设置为最大时间戳。

第二个数据集的上载以带有时间戳的记录开始,该记录的时间戳距新观察到的时间早2年,导致内部流丢弃消息。如果将Kafka日志记录设置为TRACE级别,则可以看到。

因此,要解决我的问题,我必须为Windows配置保留期和宽限期:

代替

.windowedBy(TimeWindows.of(windowSize))

我必须指定

.windowedBy(TimeWindows.of(windowSize).grace(Duration.ofDays(5 * 365)))

另外,我还必须将r​​educer存储设置显式配置为:

 Materialized.as(
    Stores.inMemoryWindowStore(
        "rollup-left-reduce",
        Duration.ofDays(5 * 365),
        windowSize, false)
)

就是这样,输出是预期的。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

子组件无法触发状态更改

来自分类Dev

更新URL而不触发状态更改

来自分类Dev

通过其同级触发状态更改时,子组件不会重新呈现

来自分类Dev

如何使用react-router触发状态更改?

来自分类Dev

Kafka Streams-抑制直到窗口结束(不关闭)

来自分类Dev

Kafka Streams拓扑使用不同的密钥但使用相同的架构

来自分类Dev

是否指定了Kafka Streams拓扑的处理顺序?

来自分类Dev

这种特殊的Kafka Streams拓扑是否引入了竞争条件?

来自分类Dev

Kafka Streams内部主题的更改复制因子会影响kafka Streams吗?流媒体会处于错误状态吗?

来自分类Dev

Kafka Streams - 旧状态聚合

来自分类Dev

Kafka Streams transform() 状态存储

来自分类Dev

Kafka Streams:由于恢复期间更改日志状态而无法重新平衡

来自分类Dev

触发状态“selectedOptions2”时如何映射()

来自分类Dev

如何发送时间窗口化KTable的最终kafka-streams聚合结果?

来自分类Dev

WPF更改窗口模式状态

来自分类Dev

iframe顶部窗口状态更改

来自分类Dev

反应窗口卸载事件不会触发

来自分类Dev

jQuery窗口调整大小不会触发

来自分类Dev

为什么我的 Kafka Streams 拓扑不能正确重放/重新处理?

来自分类Dev

带滚动窗口的SQL聚合

来自分类Dev

窗口更改时如何触发事件?

来自分类Dev

窗口更改时如何触发事件?

来自分类Dev

更改形状时窗口不会更新

来自分类Dev

调整窗口大小不会更改Java值

来自分类Dev

卡夫卡(Kafka)Storm喷口更改拓扑并从旧偏移消耗

来自分类Dev

更改zookeeper中的Kafka主机名条目并在风暴拓扑重启时保持它

来自分类Dev

使用ui-sref在按钮标签中触发状态

来自分类Dev

阻止浏览器在使用visibilityChange事件触发状态更新时向上滚动

来自分类Dev

如何在Reactjs中从另一个组件触发状态

Related 相关文章

  1. 1

    子组件无法触发状态更改

  2. 2

    更新URL而不触发状态更改

  3. 3

    通过其同级触发状态更改时,子组件不会重新呈现

  4. 4

    如何使用react-router触发状态更改?

  5. 5

    Kafka Streams-抑制直到窗口结束(不关闭)

  6. 6

    Kafka Streams拓扑使用不同的密钥但使用相同的架构

  7. 7

    是否指定了Kafka Streams拓扑的处理顺序?

  8. 8

    这种特殊的Kafka Streams拓扑是否引入了竞争条件?

  9. 9

    Kafka Streams内部主题的更改复制因子会影响kafka Streams吗?流媒体会处于错误状态吗?

  10. 10

    Kafka Streams - 旧状态聚合

  11. 11

    Kafka Streams transform() 状态存储

  12. 12

    Kafka Streams:由于恢复期间更改日志状态而无法重新平衡

  13. 13

    触发状态“selectedOptions2”时如何映射()

  14. 14

    如何发送时间窗口化KTable的最终kafka-streams聚合结果?

  15. 15

    WPF更改窗口模式状态

  16. 16

    iframe顶部窗口状态更改

  17. 17

    反应窗口卸载事件不会触发

  18. 18

    jQuery窗口调整大小不会触发

  19. 19

    为什么我的 Kafka Streams 拓扑不能正确重放/重新处理?

  20. 20

    带滚动窗口的SQL聚合

  21. 21

    窗口更改时如何触发事件?

  22. 22

    窗口更改时如何触发事件?

  23. 23

    更改形状时窗口不会更新

  24. 24

    调整窗口大小不会更改Java值

  25. 25

    卡夫卡(Kafka)Storm喷口更改拓扑并从旧偏移消耗

  26. 26

    更改zookeeper中的Kafka主机名条目并在风暴拓扑重启时保持它

  27. 27

    使用ui-sref在按钮标签中触发状态

  28. 28

    阻止浏览器在使用visibilityChange事件触发状态更新时向上滚动

  29. 29

    如何在Reactjs中从另一个组件触发状态

热门标签

归档