如何使用异步I / O将实时数据集写入磁盘?

jfriend00

我是开发Node.js的新手(尽管在客户端JavaScript方面相对有经验),并且在处理Node.js中的异步操作时遇到了很多有关良好实践的问题。

我的特定问题(尽管我想这是一个相当通用的主题)是我有一个node.js应用程序(在Raspberry Pi上运行),该应用程序每10秒记录一次从多个温度探测器到内存数据结构的读数。这样很好。数据会随着时间在内存中累积,并且随着累积并达到特定大小阈值,数据会定期进行老化(仅保留最后N天的数据),以防止数据超过一定大小。此温度数据用于控制其他一些电器。

然后,我有一个单独的间隔计时器,每隔一段时间就会将此数据写出到磁盘上(如果进程崩溃,则将其持久保存)。我使用的是异步的Node.js( fs.open()fs.write()fs.close())磁盘IO将数据写入到磁盘中。

而且,由于磁盘IO的异步特性,在我看来,我尝试写入磁盘的数据结构可能会在我将其写入磁盘时被修改。那可能是一件坏事。如果仅在写入磁盘时将数据追加到数据结构中,则实际上不会导致我写入数据的方式出现问题,但是在某些情况下,可以在记录新数据时修改较早的数据这真的会干扰我写磁盘过程中的完整性。

我可以想到我可以在代码中放入的各种难看的防护措施,例如:

  1. 切换到同步IO以将数据写入磁盘(出于服务器响应性原因,实际上并不想这样做)。
  2. 当我开始写入数据时设置一个标志,并且在设置该标志时不记录任何新数据(导致我在写入过程中丢失数据记录)。
  3. 选项2的更复杂版本,我设置了标志,并且当设置了标志时,新数据进入一个单独的临时数据结构中,当文件IO完成后,新数据将与真实数据合并(可行,但看起来很丑)。
  4. 为原始数据制作快照副本,然后花时间将副本写入磁盘,因为其他人都不会修改该副本。我不想这样做,因为数据集相对较大,并且我的内存环境有限(Raspberry PI)。

因此,我的问题是,当其他操作可能要在异步IO期间修改数据时,使用异步IO写入大型数据集的设计模式是什么?除了上面列出的特定变通办法以外,还有其他通用的方式来处理我的问题吗?

迈克·S

您的问题是数据同步传统上,这是通过locks / mutexs解决的,但是javascript / node实际上并没有内置的东西。

那么,我们如何在节点中解决这个问题呢?我们使用队列。我个人使用异步模块中队列功能

队列通过保留需要执行的任务列表来工作,并且仅在上一个任务完成后才按照添加到队列中的顺序执行这些任务(类似于您的选项3)。

排队动画

注意:异步模块的queue方法实际上可以同时运行多个任务(如上面的动画所示),但是由于我们在这里讨论数据同步,所以我们不希望这样。幸运的是,我们可以告诉它一次只运行一个。

在您的特定情况下,您要设置一个队列,该队列可以执行两种类型的任务:

  1. 修改您的数据结构
  2. 将数据结构写入磁盘

每当您从温度探测器获得新数据时,就将任务添加到队列中,以使用该新数据修改数据结构。然后,每当间隔计时器触发时,将任务添加到将数据结构写入磁盘的队列中。

由于队列一次只能执行一个任务,因此按照将它们添加到队列的顺序进行操作,因此可以确保在将数据写入磁盘时,您永远都不会修改内存中的数据结构。

一个非常简单的实现可能看起来像:

var dataQueue = async.queue(function(task, callback) {
    if (task.type === "newData") {
        memoryStore.add(task.data); // modify your data structure however you do it now
        callback(); // let the queue know the task is done; you can pass an error here as usual if needed
    } else if (task.type === "writeData") {
        fs.writeFile(task.filename, JSON.stringify(memoryStore), function(err) {
            // error handling
            callback(err); // let the queue know the task is done
        })
    } else {
        callback(new Error("Unknown Task")); // just in case we get a task we don't know about
    }
}, 1); // The 1 here is setting the concurrency of the queue so that it will only run one task at a time

// call when you get new probe data
funcion addNewData(data) {
    dataQueue.push({task: "newData", data: data}, function(err) {
        // called when the task is complete; optional
    });
}

// write to disk every 5 minutes
setInterval(function() {
    dataQueue.push({task: "writeData", filename: "somefile.dat"}, function(err) {
        // called when the task is complete; optional
    });
}, 18000);

还要注意,您现在可以异步将数据添加到数据结构中。假设您添加了一个新探针,该探针会在事件值更改时触发该事件。您可以addNewData(data)像处理现有探针一样,而不必担心它与正在进行的修改或磁盘写入冲突(如果您开始写入数据库而不是内存中的数据存储,这确实会发挥作用)。


更新:使用以下更优雅的实现bind()

这个想法是,您bind()可以将参数绑定到函数,然后将bind()返回的新绑定函数推入队列。这样,您无需将一些自定义对象推送到它必须解释的队列中。您只需给它一个函数调用即可,所有设置都已经带有正确的参数。唯一的警告是该函数必须将回调作为其最后一个参数。

那应该允许您使用所有现有功能(可能需要进行一些修改),并在需要确保它们不会同时运行时将它们推入队列。

我将其组合在一起以测试概念:

var async = require('async');

var dataQueue = async.queue(function(task, callback) {
    // task is just a function that takes a callback; call it
    task(callback); 
}, 1); // The 1 here is setting the concurrency of the queue so that it will only run one task at a time

function storeData(data, callback) {
    setTimeout(function() { // simulate async op
        console.log('store', data);
        callback(); // let the queue know the task is done
    }, 50);
}

function writeToDisk(filename, callback) {
    setTimeout(function() { // simulate async op
        console.log('write', filename);
        callback(); // let the queue know the task is done
    }, 250);
}

// store data every second
setInterval(function() {
    var data = {date: Date.now()}
    var boundStoreData = storeData.bind(null, data);
    dataQueue.push(boundStoreData, function(err) {
        console.log('store complete', data.date);
    })
}, 1000)

// write to disk every 2 seconds
setInterval(function() {
    var filename = Date.now() + ".dat"
    var boundWriteToDisk = writeToDisk.bind(null, filename);
    dataQueue.push(boundWriteToDisk, function(err) {
        console.log('write complete', filename);
    });
}, 2000);

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

将新数据添加到特定子级后,如何使用云功能将数据写入Firebase实时数据库

来自分类Dev

我们如何将数据写入firebase实时数据库?即使启用了读/写,似乎也不起作用

来自分类Dev

如何使用 Intent 传递实时数据

来自分类Dev

Firebase,如何限制从特定数据集的检索请求(实时数据库)

来自分类Dev

如何将包含html数据的数据帧写入磁盘?

来自分类Dev

如何使用指定的键将孩子推送到 Firebase 实时数据库?

来自分类Dev

如何使用Firebase实时数据库中的自定义声明限制读取/写入记录的数量?

来自分类Dev

如何使用Streams在Dart中使用异步输入成功执行文件I / O

来自分类Dev

如何将庞大的实时数据库变成小型测试数据库?

来自分类Dev

React + Firebase:如何添加项目以将数据列表到firebase实时数据库中

来自分类Dev

如何将数据从Google Pub / Sub发送到Firebase实时数据库

来自分类Dev

将数据推送到 Firebase 实时数据库时如何更改密钥

来自分类Dev

使用laravel将Firebase实时数据库连接到MYSQL数据库表

来自分类Dev

如何使用随机生成的ID将现有数据更新到Firebase实时数据库中的数据

来自分类Dev

如何更改“实时数据库”?

来自分类Dev

如何实现嵌套的实时数据?

来自分类Dev

使用OpenGL绘制实时数据

来自分类Dev

实现类以使用实时数据

来自分类Dev

使用Angular检索Firebase实时数据

来自分类Dev

使用MATLAB绘制实时数据

来自分类Dev

使用OpenGL绘制实时数据

来自分类Dev

使用 PHP 进行实时数据更新

来自分类Dev

如何验证我的异步/等待正在使用I / O完成端口?

来自分类Dev

如何将软实时数据从Matlab流传输到C#应用程序?

来自分类Dev

如何将价值与Firebase实时数据库的子项进行比较?

来自分类Dev

如何将实时数据通过网络抓取到Google表格中

来自分类Dev

GTFS 实时数据:如何将 Feed 消息打印到终端转换为 GTFS

来自分类Dev

如何将实时数据从 nodejs 服务器推送到 AngularJS?

来自分类Dev

如何异步处理子进程的 I/O?

Related 相关文章

  1. 1

    将新数据添加到特定子级后,如何使用云功能将数据写入Firebase实时数据库

  2. 2

    我们如何将数据写入firebase实时数据库?即使启用了读/写,似乎也不起作用

  3. 3

    如何使用 Intent 传递实时数据

  4. 4

    Firebase,如何限制从特定数据集的检索请求(实时数据库)

  5. 5

    如何将包含html数据的数据帧写入磁盘?

  6. 6

    如何使用指定的键将孩子推送到 Firebase 实时数据库?

  7. 7

    如何使用Firebase实时数据库中的自定义声明限制读取/写入记录的数量?

  8. 8

    如何使用Streams在Dart中使用异步输入成功执行文件I / O

  9. 9

    如何将庞大的实时数据库变成小型测试数据库?

  10. 10

    React + Firebase:如何添加项目以将数据列表到firebase实时数据库中

  11. 11

    如何将数据从Google Pub / Sub发送到Firebase实时数据库

  12. 12

    将数据推送到 Firebase 实时数据库时如何更改密钥

  13. 13

    使用laravel将Firebase实时数据库连接到MYSQL数据库表

  14. 14

    如何使用随机生成的ID将现有数据更新到Firebase实时数据库中的数据

  15. 15

    如何更改“实时数据库”?

  16. 16

    如何实现嵌套的实时数据?

  17. 17

    使用OpenGL绘制实时数据

  18. 18

    实现类以使用实时数据

  19. 19

    使用Angular检索Firebase实时数据

  20. 20

    使用MATLAB绘制实时数据

  21. 21

    使用OpenGL绘制实时数据

  22. 22

    使用 PHP 进行实时数据更新

  23. 23

    如何验证我的异步/等待正在使用I / O完成端口?

  24. 24

    如何将软实时数据从Matlab流传输到C#应用程序?

  25. 25

    如何将价值与Firebase实时数据库的子项进行比较?

  26. 26

    如何将实时数据通过网络抓取到Google表格中

  27. 27

    GTFS 实时数据:如何将 Feed 消息打印到终端转换为 GTFS

  28. 28

    如何将实时数据从 nodejs 服务器推送到 AngularJS?

  29. 29

    如何异步处理子进程的 I/O?

热门标签

归档