我正在使用Apache Flink进行流处理。
订阅来自源(例如:Kafka,AWS Kinesis Data Streams)的消息,然后使用Flink运算符对流数据应用转换,聚合等之后,我想要缓冲最终消息(例如:计数为1000)并将每个批次发布到对外部REST API的单个请求。
如何在Apache Flink中实现缓冲机制(批量创建每1000条记录)?
Flink pipileine:流式源->使用运算符转换/减少->缓冲1000条消息->发布到REST API
感谢你的帮助!
我将创建一个接收器,其状态将保留传入的消息。当计数足够高(1000)时,接收器将发送批处理。该状态可以在内存中(例如,持有消息ArrayList的实例变量),但是您应该使用检查点,以便在出现某种故障时可以恢复该状态。
当接收器具有检查点状态时,它需要实现CheckpointedFunction(在org.apache.flink.streaming.api.checkpoint中),这意味着您需要向接收器添加两个方法:
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
// HttpSinkStateItem is a user-written class
// that just holds a collection of messages (Strings, in this case)
//
// Buffer is declared as ArrayList<String>
checkpointedState.add(new HttpSinkStateItem(buffer));
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// Mix and match different kinds of states as needed:
// - Use context.getOperatorStateStore() to get basic (non-keyed) operator state
// - types are list and union
// - Use context.getKeyedStateStore() to get state for the current key (only for processing keyed streams)
// - types are value, list, reducing, aggregating and map
// - Distinguish between state data using state name (e.g. "HttpSink-State")
ListStateDescriptor<HttpSinkStateItem> descriptor =
new ListStateDescriptor<>(
"HttpSink-State",
HttpSinkStateItem.class);
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
if (context.isRestored()) {
for (HttpSinkStateItem item: checkpointedState.get()) {
buffer = new ArrayList<>(item.getPending());
}
}
}
如果计数未达到阈值,您也可以在接收器中使用计时器(如果输入流是键控/分区的)以定期发送。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句