我想在一个时间窗口内使用Kafka-Streams流式传输一个主题的最新记录,并且我想将输出记录的时间戳设置为等于该记录在其上记录的时间窗口的结尾。
我的问题是我无法在聚合器内部访问窗口属性。
这是我现在拥有的代码:
KS0
.groupByKey()
.windowedBy(
TimeWindows.of(Duration.ofSeconds(this.periodicity)).grace(Duration.ZERO)
)
.aggregate(
Constants::getInitialAssetTimeValue,
this::aggregator,
Materialized.<AssetKey, AssetTimeValue, WindowStore<Bytes, byte[]>>as(this.getStoreName()) /* state store name */
.withValueSerde(assetTimeValueSerde) /* serde for aggregate value */
)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.peek((key, value) -> log.info("key={}, value={}", key, value.toString()))
.to(this.toTopic);
我正在使用的聚合函数就是这个:
private AssetTimeValue aggregator(AssetKey aggKey, AssetTimeValue newValue, AssetTimeValue aggValue){
// I want to do something like that, but this only works with windowed Keys to which I do
// not have access through the aggregator
// windowEndTime = aggKey.window().endTime().getEpochSecond();
return AssetTimeValue.newBuilder()
.setTimestamp(windowEndTime)
.setName(newValue.getName())
.setValue(newValue.getValue())
.build();
}
非常感谢您的帮助!
您只能通过Processor API操纵时间戳。但是,您可以轻松使用DSL中嵌入的Processor API。
对于您的情况,您可以transform()
在toStream()
和之间插入一个to()
。在内,Transformer
您context.forward(key, value, To.all().withTimestamp(...))
可以设置新的时间戳。另外,您将return null
在最后(null
意味着不发出任何记录,因为您已经context.forward
为此目的使用过)。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句