我是开发Node.js的新手(尽管在客户端JavaScript方面相对有经验),并且在处理Node.js中的异步操作时遇到了很多有关良好实践的问题。
我的特定问题(尽管我想这是一个相当通用的主题)是我有一个node.js应用程序(在Raspberry Pi上运行),该应用程序每10秒记录一次从多个温度探测器到内存数据结构的读数。这样很好。数据会随着时间在内存中累积,并且随着累积并达到特定大小阈值,数据会定期进行老化(仅保留最后N天的数据),以防止数据超过一定大小。此温度数据用于控制其他一些电器。
然后,我有一个单独的间隔计时器,每隔一段时间就会将此数据写出到磁盘上(如果进程崩溃,则将其持久保存)。我使用的是异步的Node.js( fs.open()
,fs.write()
和fs.close()
)磁盘IO将数据写入到磁盘中。
而且,由于磁盘IO的异步特性,在我看来,我尝试写入磁盘的数据结构可能会在我将其写入磁盘时被修改。那可能是一件坏事。如果仅在写入磁盘时将数据追加到数据结构中,则实际上不会导致我写入数据的方式出现问题,但是在某些情况下,可以在记录新数据时修改较早的数据这真的会干扰我写磁盘过程中的完整性。
我可以想到我可以在代码中放入的各种难看的防护措施,例如:
因此,我的问题是,当其他操作可能要在异步IO期间修改数据时,使用异步IO写入大型数据集的设计模式是什么?除了上面列出的特定变通办法以外,还有其他通用的方式来处理我的问题吗?
您的问题是数据同步。传统上,这是通过locks / mutexs解决的,但是javascript / node实际上并没有内置的东西。
那么,我们如何在节点中解决这个问题呢?我们使用队列。我个人使用异步模块中的队列功能。
队列通过保留需要执行的任务列表来工作,并且仅在上一个任务完成后才按照添加到队列中的顺序执行这些任务(类似于您的选项3)。
注意:异步模块的queue方法实际上可以同时运行多个任务(如上面的动画所示),但是由于我们在这里讨论数据同步,所以我们不希望这样。幸运的是,我们可以告诉它一次只运行一个。
在您的特定情况下,您要设置一个队列,该队列可以执行两种类型的任务:
每当您从温度探测器获得新数据时,就将任务添加到队列中,以使用该新数据修改数据结构。然后,每当间隔计时器触发时,将任务添加到将数据结构写入磁盘的队列中。
由于队列一次只能执行一个任务,因此按照将它们添加到队列的顺序进行操作,因此可以确保在将数据写入磁盘时,您永远都不会修改内存中的数据结构。
一个非常简单的实现可能看起来像:
var dataQueue = async.queue(function(task, callback) {
if (task.type === "newData") {
memoryStore.add(task.data); // modify your data structure however you do it now
callback(); // let the queue know the task is done; you can pass an error here as usual if needed
} else if (task.type === "writeData") {
fs.writeFile(task.filename, JSON.stringify(memoryStore), function(err) {
// error handling
callback(err); // let the queue know the task is done
})
} else {
callback(new Error("Unknown Task")); // just in case we get a task we don't know about
}
}, 1); // The 1 here is setting the concurrency of the queue so that it will only run one task at a time
// call when you get new probe data
funcion addNewData(data) {
dataQueue.push({task: "newData", data: data}, function(err) {
// called when the task is complete; optional
});
}
// write to disk every 5 minutes
setInterval(function() {
dataQueue.push({task: "writeData", filename: "somefile.dat"}, function(err) {
// called when the task is complete; optional
});
}, 18000);
还要注意,您现在可以异步将数据添加到数据结构中。假设您添加了一个新探针,该探针会在事件值更改时触发该事件。您可以addNewData(data)
像处理现有探针一样,而不必担心它与正在进行的修改或磁盘写入冲突(如果您开始写入数据库而不是内存中的数据存储,这确实会发挥作用)。
更新:使用以下更优雅的实现bind()
这个想法是,您bind()
可以将参数绑定到函数,然后将bind()
返回的新绑定函数推入队列。这样,您无需将一些自定义对象推送到它必须解释的队列中。您只需给它一个函数调用即可,所有设置都已经带有正确的参数。唯一的警告是该函数必须将回调作为其最后一个参数。
那应该允许您使用所有现有功能(可能需要进行一些修改),并在需要确保它们不会同时运行时将它们推入队列。
我将其组合在一起以测试概念:
var async = require('async');
var dataQueue = async.queue(function(task, callback) {
// task is just a function that takes a callback; call it
task(callback);
}, 1); // The 1 here is setting the concurrency of the queue so that it will only run one task at a time
function storeData(data, callback) {
setTimeout(function() { // simulate async op
console.log('store', data);
callback(); // let the queue know the task is done
}, 50);
}
function writeToDisk(filename, callback) {
setTimeout(function() { // simulate async op
console.log('write', filename);
callback(); // let the queue know the task is done
}, 250);
}
// store data every second
setInterval(function() {
var data = {date: Date.now()}
var boundStoreData = storeData.bind(null, data);
dataQueue.push(boundStoreData, function(err) {
console.log('store complete', data.date);
})
}, 1000)
// write to disk every 2 seconds
setInterval(function() {
var filename = Date.now() + ".dat"
var boundWriteToDisk = writeToDisk.bind(null, filename);
dataQueue.push(boundWriteToDisk, function(err) {
console.log('write complete', filename);
});
}, 2000);
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句