在 apache nifi 中批量处理流文件

马赫沙999

我编写了自定义 nifi 处理器,它尝试批处理输入流文件。

但是,它的行为似乎并不像预期的那样。这是发生的事情:

我在服务器上复制粘贴一些文件。FethFromServerProcessor从服务器获取这些文件并将其放入queue1. MyCustomProcessorqueue1. 在其方法batchSizeMyCustomProcessor和内部定义了属性onTrigger(),我queue1通过执行以下操作从当前批处理中获取所有流文件

session.get(context.getProperty(batchSize).asInteger())

第一行onTrigger()创建时间戳并在所有流文件上添加此时间戳。所以批处理中的所有文件都应该具有相同的时间戳。然而,这并没有发生。通常第一个流文件得到一个时间戳,其余的流文件得到另一个时间戳。

似乎当FetchFromServerProcessor从服务器获取第一个文件并将其放入 , 时queue1MyCustomProcessor会被触发并从队列中获取所有文件。顺便说一句,碰巧以前有单个文件,在此批处理中作为唯一文件被拾取。MyCustomProcessor处理此文件时,FetchFromServerProcessor已从服务器获取所有文件并将它们放入queue1. 因此,在处理第一个文件后,MyCustomProcessor将所有文件放入queue1并形成第二批,而我希望在单个批次中提取所有文件。

如何避免形成两批?我看到人们在这种情况下讨论等待通知:1 , 2但我无法从这些帖子中快速理解。有人可以使用等待通知处理器给我最少的步骤来实现这一点,或者有人可以向我指出最小教程,该教程提供使用等待通知处理器的分步程序?等待通知模式也是解决我解释过的批处理相关问题的标准方法吗?或者有没有其他标准方法来完成这项工作?

安迪

听起来好像这个批处理大小是传入流文件所需的数量CustomProcessor,那么为什么不写CustomProcessor#onTrigger()如下:

@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    final ComponentLog logger = getLogger();
    // Try to get n flowfiles from incoming queue
    final Integer desiredFlowfileCount = context.getProperty(batchSize).asInteger();
    final int queuedFlowfileCount = session.getQueueSize().getObjectCount();
    if (queuedFlowfileCount < desiredFlowfileCount) {
        // There are not yet n flowfiles queued up, so don't try to run again immediately
        if (logger.isDebugEnabled()) {
            logger.debug("Only {} flowfiles queued; waiting for {}", new Object[]{queuedFlowfileCount, desiredFlowfileCount});
        }
        context.yield();
        return;
    }

    // If we're here, we do have at least n queued flowfiles
    List<FlowFile> flowfiles = session.get(desiredFlowfileCount);

    try {
        // TODO: Perform work on all flowfiles
        flowfiles = flowfiles.stream().map(f -> session.putAttribute(f, "timestamp", "my static timestamp value")).collect(Collectors.toList());
        session.transfer(flowfiles, REL_SUCCESS);

        // If extending AbstractProcessor, this is handled for you and you don't have to explicitly commit
        session.commit();
    } catch (Exception e) {
        logger.error("Helpful error message");
        if (logger.isDebugEnabled()) {
            logger.error("Further stacktrace: ", e);
        }
        // Penalize the flowfiles if appropriate (also done for you if extending AbstractProcessor and an exception is thrown from this method
        session.rollback(true);
        //  --- OR ---
        // Transfer to failure if they can't be retried
        session.transfer(flowfiles, REL_FAILURE);
    }
}

stream如果不熟悉Java 8语法,可以用这个替换:

        for (int i = 0; i < flowfiles.size(); i++) {
            // Write the same timestamp value onto all flowfiles
            FlowFile f = flowfiles.get(i);
            flowfiles.set(i, session.putAttribute(f, "timestamp", "my timestamp value"));
        }

惩罚(告诉处理器延迟在特定流文件上执行工作)和让步(告诉处理器等待一段时间以再次尝试执行任何工作)之间语义很重要。

您可能还希望自定义处理器上@TriggerSerially注释确保您没有运行多个线程,从而可能出现竞争条件。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

Related 相关文章

热门标签

归档