If I have three observables, how do I combine them such that when the first emit a new value, I can combine that with the latest emitted from the other two.. Its important that we only produce a new value of the combined observables when the first observable has a new value regardles of whether the other two has emitted new values.
Thanks.
You could do this (note that you get default values for sources 2 and 3 if they have yet to emit, and I have allowed for you to optionally supply those default values):
public static class ObservableExtensions
{
public static IObservable<TResult> CombineFirstWithLatestOfRest
<TSource1, TSource2, TSource3, TResult>(
this IObservable<TSource1> source1,
IObservable<TSource2> source2,
IObservable<TSource3> source3,
Func<TSource1, TSource2, TSource3, TResult> resultSelector,
TSource2 source2Default = default(TSource2),
TSource3 source3Default = default(TSource3))
{
var latestOfRest = source2.CombineLatest(source3, Tuple.Create);
return source1.Zip(latestOfRest.MostRecent(
Tuple.Create(source2Default, source3Default)),
(s1,s23) => resultSelector(s1, s23.Item1, s23.Item2));
}
}
Example use:
var source1 = Observable.Interval(TimeSpan.FromSeconds(2));
var source2 = Observable.Interval(TimeSpan.FromSeconds(1));
var source3 = Observable.Interval(TimeSpan.FromSeconds(3.5));
var res = source1.CombineFirstWithLatestOfRest(source2, source3,
(x,y,z) => string.Format("1: {0} 2: {1} 3: {2}", x,y,z));
res.Subscribe(Console.WriteLine);
There's a subtle problem here that might be significant in your scenario. A sometimes undesirable aspect of CombineLatest
is that it does not emit until it has a value from every contributing stream. This means in the example above that the slower source3
holds up source2
and values are missed. Specifically, events returned will have the default values for both source2
and source3
until both have emitted at least one event each. Kicking off source2
and source3
with their default values is a convenient workaround for this behaviour, which we can get away with since it is source1
that drives events:
public static class ObservableExtensions
{
public static IObservable<TResult> CombineFirstWithLatestOfRest
<TSource1, TSource2, TSource3, TResult>(
this IObservable<TSource1> source1,
IObservable<TSource2> source2,
IObservable<TSource3> source3,
Func<TSource1, TSource2, TSource3, TResult> resultSelector,
TSource2 source2Default = default(TSource2),
TSource3 source3Default = default(TSource3))
{
source2 = source2.StartWith(source2Default); // added this
source3 = source3.StartWith(source3Default); // and this
var lastestOfRest = source2.CombineLatest(source3, Tuple.Create);
return source1.Zip(lastestOfRest.MostRecent(
Tuple.Create(source2Default, source3Default)), // now redundant
(s1,s23) => resultSelector(s1, s23.Item1, s23.Item2));
}
}
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句