データのパケットを定期的に送信するプロセスがあり、パケットが到着するタイミングなどに基づいてそのストリームを管理する必要があります。ある時点で、ストリームとプロセスも閉じます。
現在、これを行うためにタイマーのセットを使用していますがrxjs
、この種のことには非常に適しているように思われるので、それを使用できることを願っています。これまでのところ、私はあまり成功していません。
ストリームは定期的にパケットを送信することになっていますが、通常は大きく逸脱し、スタックすることがあります。
次の条件で、ある時点でストリームを閉じたいと思います。
startDelay
最初のパケットを送信する以上の時間がかかる場合。middleDelay
2つのパケットの間に3つ以上の一時停止がある場合。maxChannelTime
。上記のいずれかの理由でストリームを閉じようとしているときは、最初に、クリーンアップを実行できるように、丁寧に閉じるように要求します。クリーンアップ中に最終的なデータパケットを送信することもあります。しかしcleanupTime
、ストリームを閉じてそれ以上のメッセージを無視する前に、クリーンアップと最後のデータが到着するのを待つ必要はありません。
イベントをObservableでラップして、「ストリーム」を作成します。私はそれをするのに問題はありません。
ストリームを「閉じる」とは、データの送信を停止し、場合によっては閉じる(つまり死ぬ)ようにプロセスに指示することを意味します。
トリッキーな問題。
私はそれを2つのフェーズに分けました-「調整済み」(定期的にチェックしたいので)と「クリーンアップ」。
逆方向に作業すると、出力は次のようになります。
const regulated = source.takeUntil(close)
const cleanup = source.skipUntil(close).takeUntil(cleanupCloser)
const output = regulated.merge(cleanup)
「クローザー」は、閉じる時間になると放出されるオブザーバブルです(タイムアウト値ごとに1つのクローザー)。
const startTimeout = 600
const intervalTimeout = 200
const maxtimeTimeout = 3000
const cleanupTimeout = 300
const startCloser = Observable.timer(startTimeout) // emit once after initial delay
.takeUntil(source) // cancel after source emits
.mapTo('startTimeoutMarker')
const intervalCloser = source.switchMap(x => // reset interval after each source emit
Observable.timer(intervalTimeout) // emit once after intervalTimeout
.mapTo('intervalTimeoutMarker')
)
const maxtimeCloser = Observable.timer(maxtimeTimeout) // emit once after maxtime
.takeUntil(startCloser) // cancel if startTimeout
.takeUntil(intervalCloser) // cancel if intervalTimeout
.mapTo('maxtimeTimeoutMarker')
const close = Observable.merge(startCloser, intervalCloser, maxtimeCloser).take(1)
const cleanupCloser = close.switchMap(x => // start when close emits
Observable.timer(cleanupTimeout) // emit once after cleanup time
)
.mapTo('cleanupTimeoutMarker')
これが実用的なサンプルCodePenです(一度に1つずつテストを実行してください)
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加