Apache Flink:如何处理三个流

Cheng Jiang

我想在一个操作符中接收和处理三个流。例如在Storm中实现的代码如下:

builder.setBolt("C_bolt", C_bolt(), parallelism_hint) .fieldsGrouping("A_bolt", "TRAINING", new Fields("word")) .fieldsGrouping("B_bolt", "ANALYSIS", new Fields("word")) .allGrouping("A_bolt", "SUM");

弗林克,加工SUM stream(A_bolt's SideOutput)TRAINING stream(A_bolt)实现的:

SingleOutputStreamOperator<Tuple3<String, Integer, Boolean>> A_bolt;
DataStream<Tuple2<Integer, Integer>> Sum = A_bolt.getSideOutput(outputTag).broadcast();
DataStream<Tuple3<String, String, Integer>> B_bolt;
DataStream<String> C_bolt= A_bolt
                        .keyBy(new KeySelector<Tuple3<String,Integer,Boolean>, String>() {
                                    @Override
                                    public String getKey(Tuple3<String,Integer,Boolean> in) throws Exception {
                                        return in.f0;
                                    }
                                })
                        .connect(Sum)
                        .flatMap(new Process())
                        .setParallelism(parallelism);

但是不知道怎么加ANALYSIS stream(B_bolt)感谢您的帮助。

大卫·安德森

Flink 只支持一输入和二输入的流操作符。您的选择是:

  1. 使用union()创建一个包含来自所有三个流的所有元素的合并流(它们必须都是相同的类型,但您可以使用Either 来协助处理)。
  2. 使用 coFlatMap 组合两个流后,将初步结果连接到第三个流,使用另一个 coFlatMap(或 coProcessFunction)完成处理。

或者这两种技术的组合在您的情况下更可取。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

一个带有三个插槽的TaskManager是否与Apache Flink中三个带有一个插槽的TaskManager相同?

来自分类Dev

如何处理大型查找表,这些表在Apache Flink中很少更新

来自分类Dev

如何处理涉及三个向量的索引?

来自分类Dev

“使用Apache Flink进行流处理”如何从IntelliJ运行书代码?

来自分类Dev

Apache Apex如何处理背压?

来自分类Dev

如何处理Apache Camel NoRouteToHostException异常?

来自分类Dev

如何使用Apache flink处理乱序事件?

来自分类Dev

Apache HttpClient 不执行第三个请求

来自分类Dev

如何调试Apache Flink?

来自分类Dev

Apache Flink-预测处理

来自分类Dev

php和apache如何处理多个请求?

来自分类Dev

Apache Spark如何处理python多线程问题?

来自分类Dev

EIP / Apache的骆驼 - 如何处理消息的同时,但原子每组?

来自分类Dev

Apache,如何处理标准URL等未知的php文件?

来自分类Dev

Apache Flink导入Scala API流扩展

来自分类Dev

Apache Flink:创建滞后的数据流

来自分类Dev

Apache Flink导入Scala API流扩展

来自分类Dev

Apache Flink如何实现迭代?

来自分类Dev

使用Apache Flink流处理缓冲转换后的消息(例如1000条计数)

来自分类Dev

如何处理Flink流中的未来事件?

来自分类Dev

Apache Beam流处理事件时间

来自分类Dev

在 apache nifi 中批量处理流文件

来自分类Dev

我如何处理流中的三元

来自分类Dev

如何处理流?

来自分类Dev

从Apache Flink查询数据

来自分类Dev

XmlInputFormat用于Apache Flink

来自分类Dev

zipWithIndex在Apache Flink上

来自分类Dev

Apache Flink与Elasticsearch的集成

来自分类Dev

Apache Flink:分步执行