在Kafka Streams中的聚合器中访问TimeWindow属性

谜2684

我想在一个时间窗口内使用Kafka-Streams流式传输一个主题的最新记录,并且我想将输出记录的时间戳设置为等于该记录在其上记录的时间窗口的结尾。

我的问题是我无法在聚合器内部访问窗口属性。

这是我现在拥有的代码:

    KS0
        .groupByKey()
        .windowedBy(
            TimeWindows.of(Duration.ofSeconds(this.periodicity)).grace(Duration.ZERO)
        )
        .aggregate(
            Constants::getInitialAssetTimeValue,
            this::aggregator,
            Materialized.<AssetKey, AssetTimeValue, WindowStore<Bytes, byte[]>>as(this.getStoreName()) /* state store name */
                .withValueSerde(assetTimeValueSerde)   /* serde for aggregate value */
        )
        .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
        .toStream()
        .peek((key, value) -> log.info("key={}, value={}", key, value.toString()))
        .to(this.toTopic);

我正在使用的聚合函数就是这个:

private AssetTimeValue aggregator(AssetKey aggKey, AssetTimeValue newValue, AssetTimeValue aggValue){

 // I want to do something like that, but this only works with windowed Keys to which I do  
 // not have access through the aggregator
 // windowEndTime = aggKey.window().endTime().getEpochSecond();   

    return AssetTimeValue.newBuilder()
            .setTimestamp(windowEndTime)
            .setName(newValue.getName())
            .setValue(newValue.getValue())
            .build();
}

非常感谢您的帮助!

马蒂亚斯·萨克斯

您只能通过Processor API操纵时间戳。但是,您可以轻松使用DSL中嵌入的Processor API。

对于您的情况,您可以transform()toStream()之间插入一个to()在内,Transformercontext.forward(key, value, To.all().withTimestamp(...))可以设置新的时间戳。另外,您将return null在最后(null意味着不发出任何记录,因为您已经context.forward为此目的使用过)。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

在Apache Kafka Streams中的特定分区上聚合

来自分类Dev

Kafka Streams - 旧状态聚合

来自分类Dev

Kafka Streams中数据混乱的原因

来自分类Dev

Kafka Streams 中的消息键长

来自分类Dev

Kafka Streams-不可预测的聚合结果

来自分类Dev

Kafka Streams如何获取kafka标头

来自分类Dev

kafka-streams警告kafka连接故障

来自分类Dev

停止Kafka Streams应用

来自分类Dev

Kafka Streams KGroupedTable恢复

来自分类Dev

Kafka Streams JoinWindow 的数据

来自分类Dev

远程访问在kubernetes中运行的Kafka

来自分类Dev

C ++中的Kafka Consumer

来自分类Dev

在 REST 代理中设置 Kafka Producer 属性

来自分类Dev

kafka 流中的处理器节点

来自分类Dev

阅读Kafka Streams DSL中已分区的主题

来自分类Dev

Kafka Streams中未确定的“错误发送记录”的原因

来自分类Dev

带有 Kafka Streams 的 Scala Embedded Kafka 中的生产者错误

来自分类Dev

Kafka Streams 不会使用聚合值重新启动

来自分类Dev

Kafka Streams:动态配置RocksDb

来自分类Dev

Kafka Streams 检测丢失的记录

来自分类Dev

Kafka Streams transform() 状态存储

来自分类常见问题

NiFi中的Kafka oauth 2.0

来自分类Dev

在Kafka Connect日志中屏蔽?

来自分类Dev

kafka从群组中删除连接

来自分类Dev

NiFi中的Kafka oauth 2.0

来自分类Dev

Kafka中的Worker Queue选项

来自分类Dev

Kafka KStreams 中的“for”循环支持

来自分类Dev

Windows 10 中的 Kafka 设置

来自分类Dev

Kafka 中的事件处理确认