如何使用代码中的新螺栓更新现有的 Storm 拓扑?

NaptownCSC

我正在编写一个 dockerized Java Spring 应用程序,它使用 Apache Storm v1.1.2、Kafka v0.11.0.1、Zookeeper 3.4.6、Eureka 和 Cloud-Config,所有这些都在由 Docker-Compose 编排的 Docker 容器中。

我用 KafkaSpout 收到的元组有一个“值”字段,它是一个 protobuf 对象。我使用自定义反序列化器将我的对象从中取出以进行处理。

我有一个基本的应用程序,我有一个bolt,它打印传入的消息并根据protobuf对象中字段的值将它们路由到其他某些bolt。我还有 LocalCluster、Config 和 TopologyBuilder 作为 Spring Beans 工作。

目前,我在 PostContruct 中设置了所有螺栓,但我需要能够动态添加螺栓以根据 protobuf 对象的其他字段过滤传入消息并执行基本聚合功能(最大/最小/窗口平均值)。

我想用 REST 控制器来做到这一点,但我怎么能在不丢失数据的情况下停止和启动拓扑?我也不想通过从头开始收听 Kafka 主题来重新启动拓扑,因为该系统将收到极高的负载。

这篇文章看起来很有希望,但我绝对希望整个过程自动化,所以我不会进入 Zookeeper https://community.hortonworks.com/articles/550/unofficial-storm-and-kafka-best-practices-guide .html

如何在代码中编辑现有拓扑以动态添加新螺栓?

Stig Rohde Døssing

你不能。Storm 拓扑一旦提交就是静态的。如果您需要根据元组中的字段改变处理,最好的选择是预先提交您需要的所有螺栓。然后,您可以通过使用一个或多个检查元组的螺栓来改变元组通过拓扑的路径,并根据元组内容发送到特定流。

例如制作一个 SplitterBolt

public void execute(Tuple input) {
  if (tuple.getIntegerByField("theDecider") == 1) {
    collector.emit("onlyOnes", tuple.getValues());
  } else {
    collector.emit("others", tuple.getValues());
  }
}

您在拓扑构建代码中的位置将类似于

builder.setSpout("kafka-spout", ...);
builder.setBolt("splitter", new SplitterBolt()).shuffleGrouping("kafka-spout");
builder.setBolt("countOnes", new CounterBolt()).shuffleGrouping("splitter", "onlyOnes");
builder.setBolt("countOthers", new CounterBolt()).shuffleGrouping("splitter", "others");

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

Storm拓扑中的可选流

来自分类Dev

Storm拓扑中的可选流

来自分类Dev

无法从Storm教程中运行Storm-starter拓扑

来自分类Dev

向Storm提交拓扑

来自分类Dev

如何在Intellij IDEA中构建和运行Storm拓扑

来自分类Dev

代码覆盖:Apache Storm 拓扑的代码覆盖工具

来自分类Dev

在Apache Storm拓扑中,喷嘴和螺栓之间是否有内部队列?

来自分类Dev

如何阻止Apache Storm消息在拓扑结构的中间重播?

来自分类Dev

在Storm TrackedTopology单元测试中运行Trident拓扑

来自分类Dev

无法使用kafka-storm将拓扑提交给Apache Storm

来自分类Dev

Apache Storm远程拓扑提交

来自分类Dev

如何将 python bolt 集成到使用 Storm Crawler SDK 构建的拓扑中

来自分类Dev

使用Storm时,如何从拓扑上下文访问对象?

来自分类Dev

如何在不使用Web仪表板的情况下分析Apache Storm拓扑?

来自分类Dev

使用Storm时,如何从拓扑上下文访问对象?

来自分类Dev

如何首次使用 apache kafka 集成部署 Storm-core 拓扑?

来自分类Dev

如何在单个Storm拓扑中正确读取多个Kafka主题

来自分类Dev

用于Storm拓扑的Mongo连接池

来自分类Dev

用kafka集成Storm的字数拓扑

来自分类Dev

在Redis上编写的Trident或Storm拓扑

来自分类Dev

Apache Storm:在Bolt内获取拓扑名称

来自分类Dev

Storm UI被杀死后显示拓扑

来自分类Dev

在 Storm 集群上提交拓扑时出错

来自分类Dev

Storm 拓扑可以包含循环吗?

来自分类Dev

向 Storm 拓扑添加交互元素

来自分类Dev

Apache Storm 拓扑上的 Sigar UnsatisfiedLinkError

来自分类Dev

关于Apache Storm,能否在多个工人并行状态下处理Trident拓扑中的一批?

来自分类Dev

编译字数拓扑时,我在Storm ui中有一个错误。该错误出现在螺栓状态的“拆分”部分

来自分类Dev

Storm拓扑事务和数据库事务

Related 相关文章

  1. 1

    Storm拓扑中的可选流

  2. 2

    Storm拓扑中的可选流

  3. 3

    无法从Storm教程中运行Storm-starter拓扑

  4. 4

    向Storm提交拓扑

  5. 5

    如何在Intellij IDEA中构建和运行Storm拓扑

  6. 6

    代码覆盖:Apache Storm 拓扑的代码覆盖工具

  7. 7

    在Apache Storm拓扑中,喷嘴和螺栓之间是否有内部队列?

  8. 8

    如何阻止Apache Storm消息在拓扑结构的中间重播?

  9. 9

    在Storm TrackedTopology单元测试中运行Trident拓扑

  10. 10

    无法使用kafka-storm将拓扑提交给Apache Storm

  11. 11

    Apache Storm远程拓扑提交

  12. 12

    如何将 python bolt 集成到使用 Storm Crawler SDK 构建的拓扑中

  13. 13

    使用Storm时,如何从拓扑上下文访问对象?

  14. 14

    如何在不使用Web仪表板的情况下分析Apache Storm拓扑?

  15. 15

    使用Storm时,如何从拓扑上下文访问对象?

  16. 16

    如何首次使用 apache kafka 集成部署 Storm-core 拓扑?

  17. 17

    如何在单个Storm拓扑中正确读取多个Kafka主题

  18. 18

    用于Storm拓扑的Mongo连接池

  19. 19

    用kafka集成Storm的字数拓扑

  20. 20

    在Redis上编写的Trident或Storm拓扑

  21. 21

    Apache Storm:在Bolt内获取拓扑名称

  22. 22

    Storm UI被杀死后显示拓扑

  23. 23

    在 Storm 集群上提交拓扑时出错

  24. 24

    Storm 拓扑可以包含循环吗?

  25. 25

    向 Storm 拓扑添加交互元素

  26. 26

    Apache Storm 拓扑上的 Sigar UnsatisfiedLinkError

  27. 27

    关于Apache Storm,能否在多个工人并行状态下处理Trident拓扑中的一批?

  28. 28

    编译字数拓扑时,我在Storm ui中有一个错误。该错误出现在螺栓状态的“拆分”部分

  29. 29

    Storm拓扑事务和数据库事务

热门标签

归档