RxJS分组发出的事件Node.js

用户名

我正在查询数据库,并按事件'db_row_receieved'的一行一行地检索结果。我正在尝试按公司ID对这些结果进行分组,但是订阅没有任何输出。

db行格式如下所示。

 // row 1
    {
        companyId: 50,
        value: 200
    }
    // row 2   
    {
        companyId: 50,
        value: 300
    }
    // row 3 
    {
        companyId: 51,
        value: 400
    }

代码:

var source = Rx.Observable.fromEvent(eventEmitter, 'db_row_receieved');
var grouped = source.groupBy((x) => { return x.companyId; });
var selectMany = grouped.selectMany(x => x.reduce((acc, v) => {
                             return acc + v.value;
                          }, 0));

var subscription = selectMany.subscribe(function (obs) {
                        console.log("value: ", obs);
                   }

预期产量:

value: 500    // from the group with companyId 50
value: 400    // from the group with companyId 51

实际输出:订阅不输出任何内容,但在使用Rx.Observable.fromArray(someArray)时有效

谁能告诉我我哪里出问题了?

阿图尔·格泽西亚克(Artur Grzesiak)

因此,问题在于reduce仅在基础流completed才会产生单个值由于事件发射器是某种无限的源,因此它始终处于活动状态。

看一下下面的代码片段-第一个示例完成了,其他示例没有完成。

const data = [
  {k: 'A', v: 1},
  {k: 'B', v: 10},
  {k: 'A', v: 1},
  {k: 'B', v: 10},
  {k: 'A', v: 1},
  {k: 'B', v: 10},
  {k: 'A', v: 1},
  {k: 'A', v: 1},
  {k: 'A', v: 1},
];

Rx.Observable.from(data)
  .concatMap(d => Rx.Observable.of(d).delay(100))
  .groupBy(d => d.k)
  .mergeMap(group => group.reduce((acc, value) => {
    acc.sum += value.v;
    return acc;
  }, {key: group.key, sum: 0}))
  .do(d => console.log('RESULT', d.key, d.sum))
  .subscribe();
  
Rx.Observable.from(data)
  .concatMap(d => Rx.Observable.of(d).delay(100))
  .merge(Rx.Observable.never()) // MERGIN NEVER IN
  // .take(data.length) // UNCOMMENT TO MITIGATE NEVER
  .groupBy(d => d.k)
  .mergeMap(group => group.reduce((acc, value) => {
    acc.sum += value.v;
    return acc;
  }, {key: group.key, sum: 0}))
  .do(d => console.log('RESULT - NEVER - WILL NOT BE PRINTED', d))
  .subscribe();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.0-beta.10/Rx.umd.js"></script>

我不知道您的特定用例,但想到的两个最常见的事情是:

  • 使用scan(可能带有反跳),
  • 使用takeUntil,如果有指示下属流的结束的事件。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

如何从Node.js中的构造函数发出/捕获事件?

来自分类Dev

Reactive JS (RxJs) 按键分组

来自分类Dev

如何使用Socket.io和Node.js发出事件?

来自分类Dev

一定时间后,node.js发出事件

来自分类Dev

如何在Node.js中动态发出事件名称?

来自分类Dev

Node.js-从另一个函数/对象发出事件

来自分类Dev

Node.js批处理下载脚本:在JSONStream的“ on”事件中发出HTTP请求

来自分类Dev

如何在Node.js中动态发出事件名称?

来自分类Dev

一定时间后,node.js发出事件

来自分类Dev

如何识别来自哪个流,事件是在 node.js 中发出的

来自分类Dev

Node JS 同步事件

来自分类Dev

Node.js-发出vs函数调用

来自分类Dev

无法在 node.js 中发出请求

来自分类Dev

Node.js事件循环

来自分类Dev

Node.JS事件错误

来自分类Dev

Angular.js-向指令发出定期事件

来自分类Dev

通过事件发出来调用JS函数

来自分类Dev

从发出的事件 vue js 更新时计算未运行

来自分类Dev

通过坐标 x, y 结构 js 发出点击事件

来自分类Dev

在node.js-mongodb中分组

来自分类Dev

如何按发出事件的字段分组?

来自分类Dev

发出HTTP请求时,node.js识别错误

来自分类Dev

如何在node.js中发出https请求

来自分类Dev

使用node.js向API发出请求

来自分类Dev

Node.js:通过链接发出删除请求

来自分类Dev

向Node.js中的JSON API发出GET请求?

来自分类Dev

node.js和路由:按类发出

来自分类Dev

如何使用 Node.JS 向 Ghostbin 发出 POST 请求?

来自分类Dev

Node.js - Socket.io-client 不发出数据