我正在构建以下Kafka Streams拓扑(伪代码):
gK = builder.stream().gropuByKey();
g1 = gK.windowedBy(TimeWindows.of("PT1H")).reduce().mapValues().toStream().mapValues().selectKey();
g2 = gK.reduce().mapValues();
g1.leftJoin(g2).to();
如果您注意到了,这是一种菱形拓扑,从单个输入主题开始,以单个输出主题结束,消息流经两个并行流,最终在最后合并在一起。一种流适用(滚动?)加窗,另一种不适用。流的两个部分都在同一个键上工作(除了窗口中间引入的WindowedKey外)。
我的消息的时间戳是事件时间。也就是说,它们是通过我的自定义配置TimestampExtractor
实现从消息正文中选择的。我的邮件中的实际时间戳是过去的几年。
在我的单元测试中,使用几个输入/输出消息以及在运行时环境(使用实际的Kafka)中,所有这些功能乍一看都很好。
当消息数量开始很大(例如40K)时,似乎出现了问题。
我失败的情况如下:
具有相同键的约40K条记录首先被上传到输入主题中
正如预期的那样,大约有4万个更新来自输出主题
与步骤1)相同但不同的另一个〜40K记录被上传到输入主题中
输出主题中仅约有100个更新,而不是预期的约40K新更新。大约100个更新没有什么特别的,它们的内容似乎是正确的,但仅在某些时间范围内。对于其他时间窗口,即使流逻辑和输入数据应明确生成40K记录,也没有更新。实际上,当我在步骤1)和3)中交换数据集时,我遇到的情况完全相同,第二个数据集有约40K个更新,而第一个数据集有约100个更新。
我可以在TopologyTestDriver
本地使用单元测试轻松重现此问题(但只能在大量输入记录上使用)。
在测试中,我尝试使用禁用缓存StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG
。不幸的是,这没有任何区别。
更新
我同时尝试了reduce()调用和aggregate()调用。在两种情况下问题仍然存在。
我还需要注意的是,StreamsConfig.TOPOLOGY_OPTIMIZATION
设置为StreamsConfig.OPTIMIZE
和不设置时,至少在第一次之前,在调试器中的mapValues()处理函数在前面的reduce()(或gregation())处理函数之前被调用。没想到
不幸的是,尝试了join()和leftJoin()。在调试器中,数据的第二部分根本不会触发“左”流程中的reduce()处理程序,但是会触发“右”流程中的reduce()处理程序。
在我的配置中,如果两个数据集中的数目或记录中的每个数目均为100,问题就不会显现出来,我将得到200条输出消息。当我在每个数据集中将数字增加到200时,我收到的预期消息少于400。因此,目前看来,诸如“旧”窗口之类的东西将被删除,而那些旧窗口的新记录将被流忽略。有可以设置的窗口保留设置,但是使用我使用的默认值,我希望窗口可以保留其状态并保持活动至少12个小时(这远远超出了单元测试的运行时间)。
尝试使用以下窗口存储配置修改左减速机:
Materialized.as(
Stores.inMemoryWindowStore(
"rollup-left-reduce",
Duration.ofDays(5 * 365),
Duration.ofHours(1), false)
)
结果仍然没有差异。
即使只有单个“左”流而没有“右”流,也没有join(),相同的问题仍然存在。看来问题出在我设置的窗口保留设置中。我的输入记录的时间戳(事件时间)跨越2年。第二个数据集再次从2年开始。Kafka Streams中的此位置可确保第二个数据集记录被忽略:
Kafka Streams版本是2.4.0。还使用Confluent依赖版本5.4.0。
我的问题是
经过一段时间的调试后,我找到了问题的原因。
我的输入数据集包含带有2年时间戳记的记录。我正在加载第一个数据集,并从输入数据集中将流的“观察”时间设置为最大时间戳。
第二个数据集的上载以带有时间戳的记录开始,该记录的时间戳距新观察到的时间早2年,导致内部流丢弃消息。如果将Kafka日志记录设置为TRACE级别,则可以看到。
因此,要解决我的问题,我必须为Windows配置保留期和宽限期:
代替
.windowedBy(TimeWindows.of(windowSize))
我必须指定
.windowedBy(TimeWindows.of(windowSize).grace(Duration.ofDays(5 * 365)))
另外,我还必须将reducer存储设置显式配置为:
Materialized.as(
Stores.inMemoryWindowStore(
"rollup-left-reduce",
Duration.ofDays(5 * 365),
windowSize, false)
)
就是这样,输出是预期的。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句