异步/等待for循环NodeJS不阻止循环执行

Sreejith sreeji

我知道旧学校for循环会以传统方式工作-等待await完成结果。但是在我的用例中,我需要从local / s3中读取文件并逐行处理它,并且对于每一行,我都需要调用一个外部API。通常,我await在循环内使用,因为它们都在lambda内运行,并且我不想使用所有内存来并行运行它。在这里,我正在使用一种stream.on()方法读取文件,为了在其中使用await,我需要在read方法中添加async,如下所示:

stream.on('data',async () =>{
         while(data=stream.read()!==null){
           console.log('line');
           const requests = getRequests(); // sync code,no pblms
           for(let i=0;i<requests.length;i++){
             const result = await apiCall(request[i);
             console.log('result from api')
             const finalResult = await anotherapiCall(result.data);
             }
        }
});

这是可行的,但是不能保证处理行的顺序。我需要全部同步。有什么帮助吗?

完整的代码

async function processSOIFileLocal (options, params) {
console.log('Process SOI file');

const readStream = byline.createStream(fs.createReadStream(key));
readStream.setEncoding('utf8');
const pattern = /^UHL\s|^UTL\s/;
const regExp = new RegExp(pattern);
readStream.on('readable', () => {
    let line;
    while (null !== (line = readStream.read())) {
        if (!regExp.test(line.toString())) {
            totalRecordsCount++;
            dataObject = soiParser(line);
            const { id } = dataObject;
            const XMLRequests = createLoSTRequestXML(
                options,
                { mapping: event.mapping, row: dataObject }
            );
            console.log('Read line');
            console.log(id);
            try {
                for (let i = 0;i < XMLRequests.length;i++) {
                    totalRequestsCount++;
                    console.log('Sending request');
                    const response = await sendLoSTRequest(
                        options,
                        { data: XMLRequests[i],
                            url: LOST_URL }
                    );
                    console.log("got response");
                    const responseObj = await xml2js.
                        parseStringPromise(response.data);
                    if (Object.keys(responseObj).indexOf('errors') !== -1) {
               
                        fs.writeFileSync(`${ERR_DIR}/${generateKey()}-${id}.xml`, response.data);
                        failedRequestsCount++;
                    } else {
                        successRequestsCount++;
                        console.log('Response from the Lost Server');
                        console.log(response[i].data);
                    }
                }
            } catch (err) {
                console.log(err);
            }
        }
    }
})
    .on('end', () => {
        console.log('file processed');
        console.log(`
        ************************************************
        Total Records Processed:${totalRecordsCount}
        Total Requests Sent: ${totalRequestsCount}
        Success Requests: ${successRequestsCount}
        Failed Requests: ${failedRequestsCount}
        ************************************************
        `);
    });
}

async function sendLoSTRequest (options, params) {
const { axios } = options;
const { url, data } = params;
if (url) {
    return  axios.post(url, data);
// eslint-disable-next-line no-else-return
} else {
    console.log('URL is not found');
    return null;
}

}

代码需要这样流动:以同步方式读取一行,处理该行,并将该行转换为每个成员调用API的两个成员的数组,并在行完成后进行处理,查找另一行,所有操作按顺序进行

更新:有一种解决方法..但它会触发stream.end()而不等待流完成读取

async function processSOIFileLocal (options, params) {
console.log('Process SOI file');
const { ERR_DIR, fs, xml2js, LOST_URL, byline, event } = options;
const { key } = params;
const responseObject = {};
let totalRecordsCount = 0;
let totalRequestsCount = 0;
let failedRequestsCount = 0;
let successRequestsCount = 0;
let dataObject = {};
const queue = (() => {
    let q = Promise.resolve();
    return fn => (q = q.then(fn));
})();
const readStream = byline.createStream(fs.createReadStream(key));
readStream.setEncoding('utf8');
const pattern = /^UHL\s|^UTL\s/;
const regExp = new RegExp(pattern);
readStream.on('readable', () => {
    let line;
    while (null !== (line = readStream.read())) {
        if (!regExp.test(line.toString())) {
            totalRecordsCount++;
            dataObject = soiParser(line);
            const { id } = dataObject;
            const XMLRequests = createLoSTRequestXML(
                options,
                { mapping: event.mapping, row: dataObject }
            );
                // eslint-disable-next-line no-loop-func
            queue(async () => {
                try {
                    for (let i = 0;i < XMLRequests.length;i++) {
                        console.log('Sending request');
                        console.log(id);
                        totalRequestsCount++;
                        const response = await sendLoSTRequest(
                            options,
                            { data: XMLRequests[i],
                                url: LOST_URL }
                        );
                        console.log('got response');
                        const responseObj = await xml2js.
                            parseStringPromise(response.data);
                        if (Object.keys(responseObj).indexOf('errors') !== -1) {
                            // console.log('Response have the error:');
                            // await handleError(options, { err: responseObj, id });
                            failedRequestsCount++;
                            fs.writeFileSync(`${ERR_DIR}/${generateKey()}-${id}.xml`, response.data);
                        } else {
                            console.log('Response from the Lost Server');
                            console.log(response[i].data);
                            successRequestsCount++;
                        }
                    }
                } catch (err) {
                    console.log(err);
                }
            });
        }
    }
})
    .on('end', () => {
        console.log('file processed');
        console.log(`
            ************************************************
            Total Records Processed:${totalRecordsCount}
            Total Requests Sent: ${totalRequestsCount}
            Success Requests: ${successRequestsCount}
            Failed Requests: ${failedRequestsCount}
            ************************************************
            `);
        Object.assign(responseObject, {
            failedRequestsCount,
            successRequestsCount,
            totalRecordsCount,
            totalRequestsCount
        });
    });

}

谢谢

Sreejith sreeji

如果有人想要一种同步处理文件的解决方案,即逐行读取并执行一些异步调用,则建议使用内置的流转换。在那里,我们可以创建一个转换函数,并在完成时返回一个回调。这将帮助任何面对此问题的人。Through2是一个小的npm库,也可以用于相同的库。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

异步和等待与循环

来自分类Dev

异步并等待循环

来自分类Dev

异步/等待导致循环

来自分类Dev

for循环正在等待执行

来自分类Dev

异步循环不尊重异步

来自分类Dev

系统verilog:如果循环内部总是阻止不执行

来自分类Dev

在 for 循环 nodejs 中异步

来自分类Dev

EF循环以异步等待更新

来自分类Dev

异步/等待循环行为

来自分类Dev

异步客户端套接字c#。如何在不阻止程序运行的情况下循环等待来自服务器的信息?

来自分类Dev

在nodeJS中使用异步模块进行涉及for循环的同步执行?

来自分类Dev

异步任务中的while循环阻止另一个异步任务的执行

来自分类Dev

javascript:异步函数问题(异步等待循环)

来自分类Dev

for循环不执行

来自分类Dev

异步/等待和Parallel.For循环

来自分类Dev

等待异步方法在for循环中完成

来自分类Dev

从异步等待保存对象时的无限循环

来自分类Dev

对于循环,异步等待,承诺-Javascript

来自分类Dev

使用异步内部循环等待提供错误

来自分类Dev

Python异步等待定时循环

来自分类Dev

在for循环中等待异步功能

来自分类Dev

在for循环中修改对象(异步/等待)

来自分类Dev

异步/等待和Parallel.For循环

来自分类Dev

VBScript For循环不执行if语句?

来自分类Dev

while循环不执行scanf()

来自分类Dev

异步/等待阻止第二个承诺执行

来自分类Dev

如何使用嵌套的foreach循环等待异步/等待完成

来自分类Dev

阻止Sublime Text执行无限循环

来自分类Dev

如何阻止线程继续执行无限循环