How to fork/duplicate a stream

user3995789

I have an outer stream. I want to use this stream in two different ways. First way is to just listen on values. Second way is to build a new stream with flatMapConcat.

But I can't do both at once. I think I have to fork or duplicate the stream.

I've tried adding a bus but it doesn't work.

var outer = Bacon.fromArray([1, 2, 3, 4, 5]);


// 1.way build a new stream
var combined = outer.flatMapConcat(function(v) {
  return Bacon.sequentially(1000, ["a" + v, "b" + v]);
});


// 2. way use the plain stream
// outer.onValue(function(v) { console.log(v); });

// Tried to insert a bus
var forkBus = new Bacon.Bus();

forkBus.plug(outer);

forkBus.onValue(function(v) {
console.log('outer side' + v);
});

combined.take(3).log();

How can I fork/duplicate a stream so I can use it in two different ways?

voithos

The issue is that .onValue(f) registers a subscriber to the event stream, and because your stream is already buffered and ready in your example (since you used fromArray()), the stream immediately gets dispatched to the new subscriber and is consumed to its end. The same issue results if you try to setup the combined stream and call .log() on it first.

The docs for Bacon.fromArray() hints at this:

creates an EventStream that delivers the given series of values (given as array) to the first subscriber. The stream ends after these values have been delivered.

In reality, if your event stream is coming from something continuous / stochastic (like user input or click events), your code will generally be able to setup a stream with as many subscribers or sub-streams as it needs before any events actually occur, like so:

var outer = $('#some-number-input').asEventStream('input')...;

outer.onValue(function(v) { console.log(v); });

var combined = outer.flatMapConcat(function(v) {
    return Bacon.sequentially(1000, ["a" + v, "b" + v]);
});

combined.take(3).log();

// After this point, any event that occurs in `outer` will pass
// through both functions

If you want to do some action on a stream without modifying it (and without registering a subscriber, which would consume the stream), you can use doAction:

var outer = Bacon.fromArray([1, 2, 3, 4, 5]);

var actioned = outer.doAction(function(v) {
    console.log(v);
});

var combined = actioned.flatMapConcat(function(v) {
    return Bacon.sequentially(1000, ["a" + v, "b" + v]);
});

combined.take(3).log();

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related