How do I combine three observables such that

fsl

If I have three observables, how do I combine them such that when the first emit a new value, I can combine that with the latest emitted from the other two.. Its important that we only produce a new value of the combined observables when the first observable has a new value regardles of whether the other two has emitted new values.

Thanks.

James World

You could do this (note that you get default values for sources 2 and 3 if they have yet to emit, and I have allowed for you to optionally supply those default values):

public static class ObservableExtensions
{
    public static IObservable<TResult> CombineFirstWithLatestOfRest
        <TSource1, TSource2, TSource3, TResult>(
        this IObservable<TSource1> source1,
        IObservable<TSource2> source2,
        IObservable<TSource3> source3,
        Func<TSource1, TSource2, TSource3, TResult> resultSelector,
        TSource2 source2Default = default(TSource2),
        TSource3 source3Default = default(TSource3))
    {
        var latestOfRest = source2.CombineLatest(source3, Tuple.Create);
        return source1.Zip(latestOfRest.MostRecent(
            Tuple.Create(source2Default, source3Default)),
            (s1,s23) => resultSelector(s1, s23.Item1, s23.Item2));
    }
}

Example use:

var source1 = Observable.Interval(TimeSpan.FromSeconds(2));
var source2 = Observable.Interval(TimeSpan.FromSeconds(1));
var source3 = Observable.Interval(TimeSpan.FromSeconds(3.5));

var res = source1.CombineFirstWithLatestOfRest(source2, source3,
    (x,y,z) => string.Format("1: {0} 2: {1} 3: {2}", x,y,z));    

res.Subscribe(Console.WriteLine);

Addendum

There's a subtle problem here that might be significant in your scenario. A sometimes undesirable aspect of CombineLatest is that it does not emit until it has a value from every contributing stream. This means in the example above that the slower source3 holds up source2 and values are missed. Specifically, events returned will have the default values for both source2 and source3 until both have emitted at least one event each. Kicking off source2 and source3 with their default values is a convenient workaround for this behaviour, which we can get away with since it is source1 that drives events:

public static class ObservableExtensions
{
    public static IObservable<TResult> CombineFirstWithLatestOfRest
        <TSource1, TSource2, TSource3, TResult>(
        this IObservable<TSource1> source1,
        IObservable<TSource2> source2,
        IObservable<TSource3> source3,
        Func<TSource1, TSource2, TSource3, TResult> resultSelector,
        TSource2 source2Default = default(TSource2),
        TSource3 source3Default = default(TSource3))
    {   
        source2 = source2.StartWith(source2Default); // added this
        source3 = source3.StartWith(source3Default); // and this           
        var lastestOfRest = source2.CombineLatest(source3, Tuple.Create);            
        return source1.Zip(lastestOfRest.MostRecent(
            Tuple.Create(source2Default, source3Default)), // now redundant
            (s1,s23) => resultSelector(s1, s23.Item1, s23.Item2));
    }             
}

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

How do I combine lenses and functors?

来自分类Dev

How do I translate vertices in a THREE.PointCloud object?

来自分类Dev

How do I render three.js in node.js?

来自分类Dev

R: how do you merge/combine two environments?

来自分类Dev

How can I combine a line and scatter on same plotly chart?

来自分类Dev

How did I manage to combine O and c to © in PuTTY?

来自分类Dev

How did I manage to combine O and c to © in PuTTY?

来自分类Dev

how can I update THREE.Sprite color

来自分类常见问题

How do I resolve ClassNotFoundException?

来自分类Dev

How to manage observable subscription for dependent observables?

来自分类Dev

How to combine Switch statements?

来自分类Dev

How to combine viewmodels?

来自分类Dev

如何在i18next中使用Knockout Observables?

来自分类Dev

使用 RxJS observables 时数据的 I/O 关系

来自分类Dev

How do i sort my listview alphabetically?

来自分类Dev

How do I parse a RestSharp response into a class?

来自分类Dev

How do I override a default keybinding in LightTable?

来自分类Dev

How do I modularize polyfills in Angular?

来自分类Dev

How do I declare a driver as global?

来自分类Dev

How do I parse JSON in Racket?

来自分类Dev

How do I link to a pdf in AngularJS?

来自分类Dev

How do I include jquery in javafx webview?

来自分类Dev

How do I retrieve the text of a cell comment

来自分类Dev

How do I write a custom module for AngularJS?

来自分类Dev

I do not understand how execlp() works in Linux

来自分类Dev

How do I iterate over a file?

来自分类Dev

How do I define the size of a CollectionView on rotate

来自分类Dev

How do I count selected checkboxes in Angular?

来自分类Dev

Basic Ocaml: How do I compile this?