我正在编写一个 dockerized Java Spring 应用程序,它使用 Apache Storm v1.1.2、Kafka v0.11.0.1、Zookeeper 3.4.6、Eureka 和 Cloud-Config,所有这些都在由 Docker-Compose 编排的 Docker 容器中。
我用 KafkaSpout 收到的元组有一个“值”字段,它是一个 protobuf 对象。我使用自定义反序列化器将我的对象从中取出以进行处理。
我有一个基本的应用程序,我有一个bolt,它打印传入的消息并根据protobuf对象中字段的值将它们路由到其他某些bolt。我还有 LocalCluster、Config 和 TopologyBuilder 作为 Spring Beans 工作。
目前,我在 PostContruct 中设置了所有螺栓,但我需要能够动态添加螺栓以根据 protobuf 对象的其他字段过滤传入消息并执行基本聚合功能(最大/最小/窗口平均值)。
我想用 REST 控制器来做到这一点,但我怎么能在不丢失数据的情况下停止和启动拓扑?我也不想通过从头开始收听 Kafka 主题来重新启动拓扑,因为该系统将收到极高的负载。
这篇文章看起来很有希望,但我绝对希望整个过程自动化,所以我不会进入 Zookeeper https://community.hortonworks.com/articles/550/unofficial-storm-and-kafka-best-practices-guide .html
如何在代码中编辑现有拓扑以动态添加新螺栓?
你不能。Storm 拓扑一旦提交就是静态的。如果您需要根据元组中的字段改变处理,最好的选择是预先提交您需要的所有螺栓。然后,您可以通过使用一个或多个检查元组的螺栓来改变元组通过拓扑的路径,并根据元组内容发送到特定流。
例如制作一个 SplitterBolt
public void execute(Tuple input) {
if (tuple.getIntegerByField("theDecider") == 1) {
collector.emit("onlyOnes", tuple.getValues());
} else {
collector.emit("others", tuple.getValues());
}
}
您在拓扑构建代码中的位置将类似于
builder.setSpout("kafka-spout", ...);
builder.setBolt("splitter", new SplitterBolt()).shuffleGrouping("kafka-spout");
builder.setBolt("countOnes", new CounterBolt()).shuffleGrouping("splitter", "onlyOnes");
builder.setBolt("countOthers", new CounterBolt()).shuffleGrouping("splitter", "others");
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句