如何将两个不同的Spout的输出发送到同一Bolt?

里特什·辛哈(Ritesh Sinha)

我有两个卡夫卡喷口,我要将其值发送到同一螺栓。

是否有可能 ?

马蒂亚斯·萨克斯

对的,这是可能的:

TopologyBuilder b = new TopologyBuilder();
b.setSpout("topic_1", new KafkaSpout(...));
b.setSpout("topic_2", new KafkaSpout(...));
b.setBolt("bolt", new MyBolt(...)).shuffleGrouping("topic_1").shuffleGrouping("topic_2");

您也可以使用任何其他分组。

更新:

为了区分使用者螺栓中的元组(即topic_1或topic_2),有两种可能性:

1)您可以使用操作员ID(如@ user-4870385所建议):

if(input.getSourceComponent().equalsIgnoreCase("topic_1")) {
    //do something
} else {
    //do something
}

2)您可以使用流名称(@zenbeni建议)。对于这种情况,两个喷口都需要声明命名的流,而螺栓需要通过流名称连接到喷口:

public class MyKafkaSpout extends KafkaSpout {
  final String streamName;

  public MyKafkaSpout(String stream) {
    this.streamName = stream;
  }

  // other stuff omitted

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    // compare KafkaSpout.declareOutputFields(...)
    declarer.declare(streamName, _spoutConfig.scheme.getOutputFields());
  }
}

构建拓扑,现在需要使用流名称:

TopologyBuilder b = new TopologyBuilder();
b.setSpout("topic_1", new MyKafkaSpout("stream_t1"));
b.setSpout("topic_2", new MyKafkaSpout("stream_t2"));
b.setBolt("bolt", new MyBolt(...)).shuffleGrouping("topic_1", "stream_t1").shuffleGrouping("topic_2", "stream_t2");

MyBolt现在可以流名称中区分输入元组:

// in my MyBolt.execute():
if(input.getSourceStreamId().equals("Topic1")) {
  // do something
} else {
  // do something
}

讨论:

虽然使用流名称第二种方法更自然(根据@zenbeni),但一种方法更灵活(IHMO)。流名称直接由spout / bolt声明(即,在编写spout / bolt代码时);与此相反,当拓扑放在一起操作者ID分配(即,在时间喷口/螺栓使用)。

假设我们得到三个螺栓作为类文件(没有源代码)。前两个应该用作生产者,并且都声明具有相同名称的输出流。如果第三个使用者按流区分输入元组,则此方法将无效。即使两个给定的生产者螺栓都声明了不同的输出流名称,预期的输入流名称也可能在使用者螺栓中进行了硬编码,并且可能不匹配。因此,它也不起作用。但是,如果使用者螺栓使用组件名称(即使它们是硬编码的)来区分传入的元组,则可以正确分配期望的组件ID。

当然,有可能从给定的类继承(如果未声明final和覆盖declareOutputFields(...),则可以分配自己的流名称。但是,这是要做的更多工作。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

如何将消息发送到 SCDF 中的两个不同输出通道?

来自分类Dev

如何使用wget访问两个网站,但将第一个网站的输出发送到`/ dev / null`?

来自分类Dev

将两个螺栓的输出发送到Storm中的单个螺栓?

来自分类Dev

如何将输出发送到stderr?

来自分类Dev

如何将标准输出和标准错误发送到两个文件

来自分类Dev

如何将两个不同的ArrayList从Servlet发送到JSP

来自分类Dev

如何将输出发送到标准输出到字符串

来自分类Dev

如何将两个文件发送到具有不同MsgSeqNumber的单个MQ消息中

来自分类Dev

如何将PDF输出发送到浏览器打印选项(CTRL + P)

来自分类Dev

如何将python中的Logger对象的输出发送到文件?

来自分类Dev

编织时如何将输出发送到控制台而不是文档

来自分类Dev

如何将Perforce同步命令的输出发送到文本文件

来自分类Dev

如何将控制台输出发送到我的电子邮件?

来自分类Dev

如何将命令输出发送到GNU屏幕的复制模式缓冲区

来自分类Dev

如何将tcpdump输出发送到数据库?

来自分类Dev

如何将命令的标准输出发送到Expect输入?

来自分类Dev

如何将servlet的json输出发送到jsp?

来自分类Dev

如何将ffmpeg输出发送到帧缓冲区?

来自分类Dev

如何将输出发送到屏幕和邮件?

来自分类Dev

如何将dmesg打印输出发送到ftrace子系统?

来自分类Dev

如何将tFileList输出发送到Talend中的文件?

来自分类Dev

如何将输出发送到日志文件和控制台?

来自分类Dev

如何将转换的输出发送到 bash 中的可写目录?

来自分类Dev

如何将多个命令的输出发送到单个 shell 管道?

来自分类Dev

如何将 VBA 输出发送到 Python 脚本?

来自分类Dev

如何将`time some_command`重复运行的输出发送到文件?

来自分类Dev

无论如何将量角器的输出发送到文件中?

来自分类Dev

bash exec将输出发送到管道,如何?

来自分类Dev

两个或多个类可以将数据发送到同一IntentService吗?

Related 相关文章

  1. 1

    如何将消息发送到 SCDF 中的两个不同输出通道?

  2. 2

    如何使用wget访问两个网站,但将第一个网站的输出发送到`/ dev / null`?

  3. 3

    将两个螺栓的输出发送到Storm中的单个螺栓?

  4. 4

    如何将输出发送到stderr?

  5. 5

    如何将标准输出和标准错误发送到两个文件

  6. 6

    如何将两个不同的ArrayList从Servlet发送到JSP

  7. 7

    如何将输出发送到标准输出到字符串

  8. 8

    如何将两个文件发送到具有不同MsgSeqNumber的单个MQ消息中

  9. 9

    如何将PDF输出发送到浏览器打印选项(CTRL + P)

  10. 10

    如何将python中的Logger对象的输出发送到文件?

  11. 11

    编织时如何将输出发送到控制台而不是文档

  12. 12

    如何将Perforce同步命令的输出发送到文本文件

  13. 13

    如何将控制台输出发送到我的电子邮件?

  14. 14

    如何将命令输出发送到GNU屏幕的复制模式缓冲区

  15. 15

    如何将tcpdump输出发送到数据库?

  16. 16

    如何将命令的标准输出发送到Expect输入?

  17. 17

    如何将servlet的json输出发送到jsp?

  18. 18

    如何将ffmpeg输出发送到帧缓冲区?

  19. 19

    如何将输出发送到屏幕和邮件?

  20. 20

    如何将dmesg打印输出发送到ftrace子系统?

  21. 21

    如何将tFileList输出发送到Talend中的文件?

  22. 22

    如何将输出发送到日志文件和控制台?

  23. 23

    如何将转换的输出发送到 bash 中的可写目录?

  24. 24

    如何将多个命令的输出发送到单个 shell 管道?

  25. 25

    如何将 VBA 输出发送到 Python 脚本?

  26. 26

    如何将`time some_command`重复运行的输出发送到文件?

  27. 27

    无论如何将量角器的输出发送到文件中?

  28. 28

    bash exec将输出发送到管道,如何?

  29. 29

    两个或多个类可以将数据发送到同一IntentService吗?

热门标签

归档