Kafka Streams - 旧状态聚合

其他

我有一个KStream与主题数据TO1是这样的:

T1-KEY -> {T1}
T2-KEY -> {T2}

和一个KTable,构造如下:

我使用org.apache.kafka.streams.StreamsBuilder创建KTable一些话题TO2看起来像这样:

A1-KEY -> { "A1", "Set": [
                          {"B1", "Rel": "T1"},
                          {"B2", "Rel": "T1"}
                         ]
          } 

..

然后将流平面映射并按 Key st 分组,结果KTable如下所示:

T1 -> { ["B1", "B2"] }

稍后,现在主题to2 中出现以下消息

A1-KEY -> { "A1", "Set": [
                          {"B2", "Rel": "T1"}
                         ]
          } 

现在我希望我的KTable能够反映这些变化,看起来像这样:

T1 -> { ["B2"] }

但它看起来像这样:

T1 -> { ["B1", "B2"] }

我注意到,在我Aggregator<Tx-KEY, Bx, Set<Bx>>给出的最后一个参数中,["B1", "B2"]即使当我在聚合之前偷看时,我也只得到一个 match "B2"

我理解聚合错误还是​​这里发生了什么?

编辑

我想我把范围缩小:显然,聚集的Initializer只是呼吁非常第一次-之后,使得聚集始终接收last aggregate作为最后一个参数,如

@Override
public Set<Bx> apply(Tx-KEY, Bx value, Set<Bx> aggregate) {

}

whereSet<Bx> aggregate[]第一次调用(通过初始化程序创建),但["B1", "B2"]用于第二次调用。

有任何想法吗?

编辑 2

public class MyAggregator implements Aggregator<Tx-KEY, Bx, Set<Bx>> {

    @Override
    public Set<Bx> apply(Tx-KEY key, Bx value, Set<Bx> aggregate) {
        aggregate.add(value);
        return aggregate;
    }
}

编辑 3

我不能只平面地图,因为我必须组合多个 Ax 元素,例如

A1-KEY -> { "A1", "Set": [
                      {"B1", "Rel": "T1"}
                     ]
          },
A2-KEY -> { "A2", "Set": [
                      {"B2", "Rel": "T1"}
                     ]
          },
...

然后我期待一些像这样的团体

T1 -> { ["B1", "B2"] }

在下一次迭代中,当消息

A1-KEY -> { "A1", "Set": [
                      {"B1", "Rel": "T1"}
                     ]
          }

到货了

T1 -> { ["B1"] }

..

米哈尔·博罗维茨基

注意在你的聚合器中你是如何只向聚合集中添加元素的。有了这个逻辑,你的集合(对于给定的键)永远不会缩小。我认为在这种情况下,您将流压平了太多。我建议您不要将其展平到您的消息具有以下形式的程度(Tx-KEY key, Bx value),而是使它们始终保持其固定形式:(Tx-KEY key, Set<Bx> value)你根本不需要聚合。为了实现这一点,我建议您转换输入集

"Set": [
     {"B1", "Rel": "T1"},
     {"B2", "Rel": "T1"}
]

进入

T1 -> { ["B1", "B2"] }

通过在 KStream flatmap 方法调用中使用标准 Java 代码(集合或流 api)按“Rel”字段分组,以便您只在Set<Bx>KStream 上发出带有-typed 值的消息,而不是Bx单独发出 -typed 值。

如果您提供当前 flatmap 实现的代码,我们很乐意详细说明。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

Kafka Streams transform() 状态存储

来自分类Dev

Kafka Streams-不可预测的聚合结果

来自分类Dev

在Apache Kafka Streams中的特定分区上聚合

来自分类Dev

在Kafka Streams中的聚合器中访问TimeWindow属性

来自分类Dev

Kafka Streams 不会使用聚合值重新启动

来自分类Dev

为什么我必须使用Kafka Streams配置状态存储

来自分类Dev

带窗口的Kafka Streams拓扑不会触发状态更改

来自分类Dev

Kafka Streams - 减少大型状态存储的内存占用

来自分类Dev

Kafka Streams内部主题的更改复制因子会影响kafka Streams吗?流媒体会处于错误状态吗?

来自分类Dev

停止Kafka Streams应用

来自分类Dev

Kafka Streams KGroupedTable恢复

来自分类Dev

Kafka Streams JoinWindow 的数据

来自分类Dev

如何发送时间窗口化KTable的最终kafka-streams聚合结果?

来自分类Dev

Kafka Streams如何获取kafka标头

来自分类Dev

kafka-streams警告kafka连接故障

来自分类Dev

Kafka Streams:动态配置RocksDb

来自分类Dev

Kafka Streams 检测丢失的记录

来自分类Dev

在Kafka Streams中重建状态存储是否会将重复的记录传播到下游主题?

来自分类Dev

Kafka Streams State Stores是否适合处理大量密钥和数据的有状态应用程序?

来自分类Dev

如何在多个磁盘上分发Kafka-Streams状态存储

来自分类Dev

Kafka Streams-添加新的源流后使用现有状态存储

来自分类Dev

Kafka Streams:由于恢复期间更改日志状态而无法重新平衡

来自分类Dev

在 Kafka Streams 中查询全局状态存储会引发 Null 异常

来自分类Dev

在 Kafka Streams 应用程序中保持本地状态存储更新

来自分类Dev

Kafka Streams:使用 at_least_once 时对状态存储的保存顺序有任何保证吗?

来自分类Dev

Kafka Streams 持久存储错误:状态存储,可能已迁移到另一个实例

来自分类Dev

Kafka-Streams-加入前过滤GlobalKTable

来自分类Dev

Kafka Streams中数据混乱的原因

来自分类Dev

Kafka Streams:第n个事件的动作

Related 相关文章

  1. 1

    Kafka Streams transform() 状态存储

  2. 2

    Kafka Streams-不可预测的聚合结果

  3. 3

    在Apache Kafka Streams中的特定分区上聚合

  4. 4

    在Kafka Streams中的聚合器中访问TimeWindow属性

  5. 5

    Kafka Streams 不会使用聚合值重新启动

  6. 6

    为什么我必须使用Kafka Streams配置状态存储

  7. 7

    带窗口的Kafka Streams拓扑不会触发状态更改

  8. 8

    Kafka Streams - 减少大型状态存储的内存占用

  9. 9

    Kafka Streams内部主题的更改复制因子会影响kafka Streams吗?流媒体会处于错误状态吗?

  10. 10

    停止Kafka Streams应用

  11. 11

    Kafka Streams KGroupedTable恢复

  12. 12

    Kafka Streams JoinWindow 的数据

  13. 13

    如何发送时间窗口化KTable的最终kafka-streams聚合结果?

  14. 14

    Kafka Streams如何获取kafka标头

  15. 15

    kafka-streams警告kafka连接故障

  16. 16

    Kafka Streams:动态配置RocksDb

  17. 17

    Kafka Streams 检测丢失的记录

  18. 18

    在Kafka Streams中重建状态存储是否会将重复的记录传播到下游主题?

  19. 19

    Kafka Streams State Stores是否适合处理大量密钥和数据的有状态应用程序?

  20. 20

    如何在多个磁盘上分发Kafka-Streams状态存储

  21. 21

    Kafka Streams-添加新的源流后使用现有状态存储

  22. 22

    Kafka Streams:由于恢复期间更改日志状态而无法重新平衡

  23. 23

    在 Kafka Streams 中查询全局状态存储会引发 Null 异常

  24. 24

    在 Kafka Streams 应用程序中保持本地状态存储更新

  25. 25

    Kafka Streams:使用 at_least_once 时对状态存储的保存顺序有任何保证吗?

  26. 26

    Kafka Streams 持久存储错误:状态存储,可能已迁移到另一个实例

  27. 27

    Kafka-Streams-加入前过滤GlobalKTable

  28. 28

    Kafka Streams中数据混乱的原因

  29. 29

    Kafka Streams:第n个事件的动作

热门标签

归档