我有一个KStream与主题数据TO1是这样的:
T1-KEY -> {T1}
T2-KEY -> {T2}
和一个KTable,构造如下:
我使用org.apache.kafka.streams.StreamsBuilder创建KTable一些话题TO2看起来像这样:
A1-KEY -> { "A1", "Set": [
{"B1", "Rel": "T1"},
{"B2", "Rel": "T1"}
]
}
..
然后将流平面映射并按 Key st 分组,结果KTable如下所示:
T1 -> { ["B1", "B2"] }
稍后,现在主题to2 中出现以下消息:
A1-KEY -> { "A1", "Set": [
{"B2", "Rel": "T1"}
]
}
现在我希望我的KTable能够反映这些变化,看起来像这样:
T1 -> { ["B2"] }
但它看起来像这样:
T1 -> { ["B1", "B2"] }
我注意到,在我Aggregator<Tx-KEY, Bx, Set<Bx>>
给出的最后一个参数中,["B1", "B2"]
即使当我在聚合之前偷看时,我也只得到一个 match "B2"
。
我理解聚合错误还是这里发生了什么?
编辑
我想我把范围缩小:显然,聚集的Initializer
只是呼吁非常第一次-之后,使得聚集始终接收last aggregate
作为最后一个参数,如
@Override
public Set<Bx> apply(Tx-KEY, Bx value, Set<Bx> aggregate) {
}
whereSet<Bx> aggregate
是[]
第一次调用(通过初始化程序创建),但["B1", "B2"]
用于第二次调用。
有任何想法吗?
编辑 2
public class MyAggregator implements Aggregator<Tx-KEY, Bx, Set<Bx>> {
@Override
public Set<Bx> apply(Tx-KEY key, Bx value, Set<Bx> aggregate) {
aggregate.add(value);
return aggregate;
}
}
编辑 3
我不能只平面地图,因为我必须组合多个 Ax 元素,例如
A1-KEY -> { "A1", "Set": [
{"B1", "Rel": "T1"}
]
},
A2-KEY -> { "A2", "Set": [
{"B2", "Rel": "T1"}
]
},
...
然后我期待一些像这样的团体
T1 -> { ["B1", "B2"] }
在下一次迭代中,当消息
A1-KEY -> { "A1", "Set": [
{"B1", "Rel": "T1"}
]
}
到货了
T1 -> { ["B1"] }
..
注意在你的聚合器中你是如何只向聚合集中添加元素的。有了这个逻辑,你的集合(对于给定的键)永远不会缩小。我认为在这种情况下,您将流压平了太多。我建议您不要将其展平到您的消息具有以下形式的程度(Tx-KEY key, Bx value)
,而是使它们始终保持其固定形式:(Tx-KEY key, Set<Bx> value)
。你根本不需要聚合。为了实现这一点,我建议您转换输入集
"Set": [
{"B1", "Rel": "T1"},
{"B2", "Rel": "T1"}
]
进入
T1 -> { ["B1", "B2"] }
通过在 KStream flatmap 方法调用中使用标准 Java 代码(集合或流 api)按“Rel”字段分组,以便您只在Set<Bx>
KStream 上发出带有-typed 值的消息,而不是Bx
单独发出 -typed 值。
如果您提供当前 flatmap 实现的代码,我们很乐意详细说明。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句