新しいストリームの開始を指定するために特定の番兵値を出力することがあるソースストリームがあります。ストリームをに変換したいのですがIObservable<IObservable<T>>
。誰もがエレガントな方法を考えることができますか?
これでうまくいくはずです:
observable = observable
.Publish()
.RefCount();
var splitted = observable
.Window(observable.Where(x => x == SENTINEL))
.Select(c => c.Where(x => x != SENTINEL));
完全な例:
const int SENTINEL = -1;
var observable = Observable
.Interval(TimeSpan.FromMilliseconds(100))
.Select(x => x + 1)
.Take(12)
.Select(x => x % 5 == 0 ? SENTINEL : x) // Every fifth is a sentinel
.Publish()
.RefCount();
observable
.Window(observable.Where(x => x == SENTINEL))
.Select(c => c.Where(x => x != SENTINEL))
.Select((c, i) => c.Select(x => (i, x))) // Embed the index of the subsequence
.Merge() // Merge them again
.Do(x => Console.WriteLine($"Received: {x}"))
.Subscribe();
await observable.LastOrDefaultAsync(); // Wait it to end
出力:
受信:(0、1)
受信:(0、2)
受信:(0、3)
受信:(0、4)
受信:(
1、6)
受信:(1、7 )受信:(
1、8 )受信:(
1、9)
受信:(2、11 )受信:(2、12)
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加