这是一个更大的过程的一部分,我将其简化为节点v14.4.0中的最小,可重复的示例。在这段代码中,它从for
循环内部什么都不输出。
我在控制台中仅看到以下输出:
before for() loop
finished
finally
done
该for await (const line1 of rl1)
循环永远不会进入for
循环-它只是跳过就在它:
const fs = require('fs');
const readline = require('readline');
const { once } = require('events');
async function test(file1, file2) {
try {
const stream1 = fs.createReadStream(file1);
await once(stream1, 'open');
const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});
const stream2 = fs.createReadStream(file2);
await once(stream2, 'open');
const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});
console.log('before for() loop');
for await (const line1 of rl1) {
console.log(line1);
}
console.log('finished');
} finally {
console.log('finally');
}
}
test("data/numbers.txt", "data/letters.txt").then(() => {
console.log(`done`);
}).catch(err => {
console.log('Got rejected promise:', err);
})
但是,如果删除其中任何一条await once(stream, 'open')
语句,则for
循环将完全执行预期的操作(列出rl1
文件的所有行)。因此,显然,在与流之间的readline接口中,异步迭代器存在一些计时问题。任何想法可能会发生什么。任何想法可能导致此问题或如何解决吗?
仅供参考,await once(stream, 'open')
是因为异步迭代器中的另一个错误,该错误在打开文件时不会拒绝,而在无法打开文件的情况下await once(stream, 'open')
会导致您正确拒绝(实质上是对打开文件进行预检查) 。
如果您想知道为什么存在stream2代码,它将在较大的项目中使用,但是我已将此示例简化为最小的,可重现的示例,并且只需要大量的代码即可演示该问题。
编辑:在尝试一个稍有不同的实现时,我发现如果将两个once(stream, "open")
调用合并到一个中Promise.all()
,则它将起作用。因此,这可行:
const fs = require('fs');
const readline = require('readline');
const { once } = require('events');
async function test(file1, file2) {
try {
const stream1 = fs.createReadStream(file1);
const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});
const stream2 = fs.createReadStream(file2);
const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});
// pre-flight file open to catch any open errors here
// because of existing bug in async iterator with file open errors
await Promise.all([once(stream1, "open"), once(stream2, "open")]);
console.log('before for() loop');
for await (const line1 of rl1) {
console.log(line1);
}
console.log('finished');
} finally {
console.log('finally');
}
}
test("data/numbers.txt", "data/letters.txt").then(() => {
console.log(`done`);
}).catch(err => {
console.log('Got rejected promise:', err);
});
显然,这不应该确切地对您等待文件打开的方式敏感。某处存在一些计时错误。我想在readline或readStream上找到该错误并将其归档。有任何想法吗?
事实证明,潜在的问题是readline.createInterface()
,在调用它时,将立即添加一个data
事件侦听器(此处为代码参考)并恢复流以开始流流动。
input.on('data', ondata);
和
input.resume();
然后,在ondata
侦听器中,它解析行数据,并在找到行时在此处触发line
事件。
for (let n = 0; n < lines.length; n++)
this._onLine(lines[n]);
但是,在我的示例中,readline.createInterface()
在调用时间与创建异步迭代器(侦听line
事件)之间还有其他异步事件发生。因此,line
事件正在发出,但没有人在听它们。
因此,为了正常工作readline.createInterface()
,要求在侦听line
事件之后必须同步添加任何要侦听事件的内容,readline.createInterface()
否则会出现争用情况并且line
事件可能会丢失。
在我的原始代码示例中,一种可靠的解决方法是readline.createInterface()
直到完成后才调用await once(...)
。然后,异步迭代器将在readline.createInterface()
调用后立即同步创建。
const fs = require('fs');
const readline = require('readline');
const { once } = require('events');
async function test(file1, file2) {
try {
const stream1 = fs.createReadStream(file1);
const stream2 = fs.createReadStream(file2);
// wait for both files to be open to catch any "open" errors here
// since readline has bugs about not properly reporting file open errors
// this await must be done before either call to readline.createInterface()
// to avoid race conditions that can lead to lost lines of data
await Promise.all([once(stream1, "open"), once(stream2, "open")]);
const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});
const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});
console.log('before for() loop');
for await (const line1 of rl1) {
console.log(line1);
}
console.log('finished');
} finally {
console.log('finally');
}
}
test("data/numbers.txt", "data/letters.txt").then(() => {
console.log(`done`);
}).catch(err => {
console.log('Got rejected promise:', err);
});
解决此一般问题的一种方法是进行更改readline.createInterface()
,使其不添加data
事件,并在有人添加line
事件侦听器的情况下恢复流。这样可以防止数据丢失。它将使readline接口对象安静地坐在那里,而不会丢失数据,直到其输出的接收器实际准备就绪为止。这将对异步迭代器有效,并且还可以防止将其他异步代码混入到的接口的其他使用中可能丢失line
事件。
关于此的注意事项已添加到此处的一个相关的开放式readline错误问题中。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句