我正在查询数据库,并按事件'db_row_receieved'的一行一行地检索结果。我正在尝试按公司ID对这些结果进行分组,但是订阅没有任何输出。
db行格式如下所示。
// row 1
{
companyId: 50,
value: 200
}
// row 2
{
companyId: 50,
value: 300
}
// row 3
{
companyId: 51,
value: 400
}
代码:
var source = Rx.Observable.fromEvent(eventEmitter, 'db_row_receieved');
var grouped = source.groupBy((x) => { return x.companyId; });
var selectMany = grouped.selectMany(x => x.reduce((acc, v) => {
return acc + v.value;
}, 0));
var subscription = selectMany.subscribe(function (obs) {
console.log("value: ", obs);
}
预期产量:
value: 500 // from the group with companyId 50
value: 400 // from the group with companyId 51
实际输出:订阅不输出任何内容,但在使用Rx.Observable.fromArray(someArray)时有效
谁能告诉我我哪里出问题了?
因此,问题在于reduce
仅在基础流complete
d才会产生单个值。由于事件发射器是某种无限的源,因此它始终处于活动状态。
看一下下面的代码片段-第一个示例完成了,其他示例没有完成。
const data = [
{k: 'A', v: 1},
{k: 'B', v: 10},
{k: 'A', v: 1},
{k: 'B', v: 10},
{k: 'A', v: 1},
{k: 'B', v: 10},
{k: 'A', v: 1},
{k: 'A', v: 1},
{k: 'A', v: 1},
];
Rx.Observable.from(data)
.concatMap(d => Rx.Observable.of(d).delay(100))
.groupBy(d => d.k)
.mergeMap(group => group.reduce((acc, value) => {
acc.sum += value.v;
return acc;
}, {key: group.key, sum: 0}))
.do(d => console.log('RESULT', d.key, d.sum))
.subscribe();
Rx.Observable.from(data)
.concatMap(d => Rx.Observable.of(d).delay(100))
.merge(Rx.Observable.never()) // MERGIN NEVER IN
// .take(data.length) // UNCOMMENT TO MITIGATE NEVER
.groupBy(d => d.k)
.mergeMap(group => group.reduce((acc, value) => {
acc.sum += value.v;
return acc;
}, {key: group.key, sum: 0}))
.do(d => console.log('RESULT - NEVER - WILL NOT BE PRINTED', d))
.subscribe();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.0-beta.10/Rx.umd.js"></script>
我不知道您的特定用例,但想到的两个最常见的事情是:
scan
(可能带有反跳),takeUntil
,如果有指示下属流的结束的事件。本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句