使用Apache Flink流处理缓冲转换后的消息(例如1000条计数)

火花8

我正在使用Apache Flink进行流处理。

订阅来自源(例如:Kafka,AWS Kinesis Data Streams)的消息,然后使用Flink运算符对流数据应用转换,聚合等之后,我想要缓冲最终消息(例如:计数为1000)并将每个批次发布到对外部REST API的单个请求。

如何在Apache Flink中实现缓冲机制(批量创建每1000条记录)?

Flink pipileine:流式源->使用运算符转换/减少->缓冲1000条消息->发布到REST API

感谢你的帮助!

克里斯·格肯

我将创建一个接收器,其状态将保留传入的消息。当计数足够高(1000)时,接收器将发送批处理。该状态可以在内存中(例如,持有消息ArrayList的实例变量),但是您应该使用检查点,以便在出现某种故障时可以恢复该状态。

当接收器具有检查点状态时,它需要实现CheckpointedFunction(在org.apache.flink.streaming.api.checkpoint中),这意味着您需要向接收器添加两个方法:

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {

    checkpointedState.clear();

    // HttpSinkStateItem is a user-written class 
    // that just holds a collection of messages (Strings, in this case)
    //
    // Buffer is declared as ArrayList<String>

    checkpointedState.add(new HttpSinkStateItem(buffer));

}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {

    // Mix and match different kinds of states as needed:
    //   - Use context.getOperatorStateStore() to get basic (non-keyed) operator state
    //        - types are list and union        
    //   - Use context.getKeyedStateStore() to get state for the current key (only for processing keyed streams)
    //        - types are value, list, reducing, aggregating and map
    //   - Distinguish between state data using state name (e.g. "HttpSink-State")      

    ListStateDescriptor<HttpSinkStateItem> descriptor =
        new ListStateDescriptor<>(
            "HttpSink-State",
            HttpSinkStateItem.class);

    checkpointedState = context.getOperatorStateStore().getListState(descriptor);

    if (context.isRestored()) {

        for (HttpSinkStateItem item: checkpointedState.get()) {
            buffer = new ArrayList<>(item.getPending());  
        }

    }       

}

如果计数未达到阈值,您也可以在接收器中使用计时器(如果输入流是键控/分区的)以定期发送。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

如何使用Apache flink处理乱序事件?

来自分类Dev

使用 Apache Flink 创建 CEP

来自分类Dev

使用Twitter流构建图形并使用Apache Flink查询

来自分类Dev

“使用Apache Flink进行流处理”如何从IntelliJ运行书代码?

来自分类Dev

使用Python将Apache Kafka与Apache Spark流集成

来自分类Dev

Apache Flink:使用 mapPartition 按顺序处理数据

来自分类Dev

使用Apache Flink从Web获取JSON元素

来自分类Dev

如何在Apache Flink中使用ActiveMQ?

来自分类Dev

在Apache-Flink中使用C / C ++

来自分类Dev

使用 EMR 的 Apache flink 的 AWS 配置

来自分类Dev

使用Apache Camel和JAVA处理ActiveMQ消息

来自分类Dev

如何使用 Apache Camel 交换消息?

来自分类Dev

使用消息时的 Apache Kafka 清理

来自分类Dev

使用 Apache Spark 处理来自 Web 的文件

来自分类Dev

Apache Flink:如何处理三个流

来自分类Dev

无法使用Flink CLI将流部署到Apache Flink的HA集群

来自分类Dev

Apache Flink:不能将 writeAsCsv() 与子类元组的数据流一起使用

来自分类Dev

在 apache camel 中使用 rabbitmq 来处理大文件/消息时减少内存使用

来自分类Dev

在 MVC 上使用基于 Microsoft 站点的 WordprocessingDocument 后收到一条消息“内存流不可扩展”

来自分类Dev

失败消息:使用Apache Flink 1.11时,检查点在完成之前已过期

来自分类Dev

如何使用Apache-Flink的TaskMangers实现容错(恢复)?

来自分类Dev

使用Apache Flink将数据推送到S3

来自分类Dev

如何使用 Apache Flink 分析事件序列以触发操作?

来自分类Dev

使用Apache Camel AWS-KINESIS终端节点,如何在Kinesis流中检查消息?

来自分类Dev

Apache NiFi:使用ExecuteScript处理器处理多个csv

来自分类Dev

使用PySpark的直接Kafka流(Apache Spark 1.6)

来自分类Dev

使用Apache骆驼观察网络上的反应流

来自分类Dev

使用Apache mod_wsgi进行HTTP流

来自分类Dev

使用Apache JMeter进行视频流和负载测试

Related 相关文章

  1. 1

    如何使用Apache flink处理乱序事件?

  2. 2

    使用 Apache Flink 创建 CEP

  3. 3

    使用Twitter流构建图形并使用Apache Flink查询

  4. 4

    “使用Apache Flink进行流处理”如何从IntelliJ运行书代码?

  5. 5

    使用Python将Apache Kafka与Apache Spark流集成

  6. 6

    Apache Flink:使用 mapPartition 按顺序处理数据

  7. 7

    使用Apache Flink从Web获取JSON元素

  8. 8

    如何在Apache Flink中使用ActiveMQ?

  9. 9

    在Apache-Flink中使用C / C ++

  10. 10

    使用 EMR 的 Apache flink 的 AWS 配置

  11. 11

    使用Apache Camel和JAVA处理ActiveMQ消息

  12. 12

    如何使用 Apache Camel 交换消息?

  13. 13

    使用消息时的 Apache Kafka 清理

  14. 14

    使用 Apache Spark 处理来自 Web 的文件

  15. 15

    Apache Flink:如何处理三个流

  16. 16

    无法使用Flink CLI将流部署到Apache Flink的HA集群

  17. 17

    Apache Flink:不能将 writeAsCsv() 与子类元组的数据流一起使用

  18. 18

    在 apache camel 中使用 rabbitmq 来处理大文件/消息时减少内存使用

  19. 19

    在 MVC 上使用基于 Microsoft 站点的 WordprocessingDocument 后收到一条消息“内存流不可扩展”

  20. 20

    失败消息:使用Apache Flink 1.11时,检查点在完成之前已过期

  21. 21

    如何使用Apache-Flink的TaskMangers实现容错(恢复)?

  22. 22

    使用Apache Flink将数据推送到S3

  23. 23

    如何使用 Apache Flink 分析事件序列以触发操作?

  24. 24

    使用Apache Camel AWS-KINESIS终端节点,如何在Kinesis流中检查消息?

  25. 25

    Apache NiFi:使用ExecuteScript处理器处理多个csv

  26. 26

    使用PySpark的直接Kafka流(Apache Spark 1.6)

  27. 27

    使用Apache骆驼观察网络上的反应流

  28. 28

    使用Apache mod_wsgi进行HTTP流

  29. 29

    使用Apache JMeter进行视频流和负载测试

热门标签

归档