rxjsのタイミングに基づいてイベントのストリームを処理する

グレッグロス

データのパケットを定期的に送信するプロセスがあり、パケットが到着するタイミングなどに基づいてそのストリームを管理する必要があります。ある時点で、ストリームとプロセスも閉じます。

現在、これを行うためにタイマーのセットを使用していますがrxjs、この種のことには非常に適しているように思われるので、それを使用できることを願っていますこれまでのところ、私はあまり成功していません。

問題

ストリームは定期的にパケットを送信することになっていますが、通常は大きく逸脱し、スタックすることがあります。

次の条件で、ある時点でストリームを閉じたいと思います。

  1. startDelay最初のパケットを送信する以上の時間がかかる場合
  2. 最初のパケットが送信された後、middleDelay2つのパケットの間に3つ以上の一時停止がある場合
  3. 一定期間後maxChannelTime

上記のいずれかの理由でストリームを閉じようとしているときは、最初に、クリーンアップを実行できるように、丁寧に閉じるように要求します。クリーンアップ中に最終的なデータパケットを送信することもあります。しかしcleanupTime、ストリームを閉じてそれ以上のメッセージを無視する前に、クリーンアップと最後のデータが到着するのを待つ必要はありません

精緻化

イベントをObservableでラップして、「ストリーム」を作成します。私はそれをするのに問題はありません。

ストリームを「閉じる」とは、データの送信を停止し、場合によっては閉じる(つまり死ぬ)ようにプロセスに指示することを意味します。

リチャード・マッセン

トリッキーな問題。

私はそれを2つのフェーズに分けました-「調整済み」(定期的にチェックしたいので)と「クリーンアップ」。

逆方向に作業すると、出力は次のようになります。

const regulated = source.takeUntil(close)
const cleanup = source.skipUntil(close).takeUntil(cleanupCloser)
const output = regulated.merge(cleanup)

「クローザー」は、閉じる時間になると放出されるオブザーバブルです(タイムアウト値ごとに1つのクローザー)。

const startTimeout = 600
const intervalTimeout = 200
const maxtimeTimeout = 3000
const cleanupTimeout = 300

const startCloser = Observable.timer(startTimeout)  // emit once after initial delay
  .takeUntil(source)                                // cancel after source emits
  .mapTo('startTimeoutMarker')

const intervalCloser = source.switchMap(x =>    // reset interval after each source emit
    Observable.timer(intervalTimeout)           // emit once after intervalTimeout
      .mapTo('intervalTimeoutMarker')
  )

const maxtimeCloser = Observable.timer(maxtimeTimeout)  // emit once after maxtime
  .takeUntil(startCloser)                               // cancel if startTimeout
  .takeUntil(intervalCloser)                            // cancel if intervalTimeout
  .mapTo('maxtimeTimeoutMarker')

const close = Observable.merge(startCloser, intervalCloser, maxtimeCloser).take(1)

const cleanupCloser = close.switchMap(x =>      // start when close emits
     Observable.timer(cleanupTimeout)           // emit once after cleanup time
  ) 
  .mapTo('cleanupTimeoutMarker')

これが実用的なサンプルCodePenです(一度に1つずつテストを実行してください)

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

タイムスタンプ値に基づいてデータをストリーミングおよび処理する(KafkaおよびSparkストリーミングを使用)

分類Dev

Rxjs:リストの内部値に基づいてデータをフィルタリングする方法は?

分類Dev

複数の条件に基づいてリストアイテムをフィルタリングする

分類Dev

db内の製品に基づいてカートアイテムをフィルタリングする

分類Dev

flinkストリーミングで将来のイベントを処理する方法は?

分類Dev

インタレストベースに基づいていくつかのグループを作成します

分類Dev

シリーズマスターイベントに基づいてすべての定期的なイベントをフェッチする方法

分類Dev

他のリストに基づいて1つのリストをストリーミングおよびフィルタリングする

分類Dev

Rxjsは、別のストリームに基づいて構成可能なストリームをフィルタリングします

分類Dev

文字列リストをストリーミングし、各アイテムの番号に基づいて並べ替える方法

分類Dev

列の値に基づいてテキスト ファイルをフィルター処理する

分類Dev

実際のイベントデータに基づいて人工的な非イベントを生成する

分類Dev

異なるデータフレームの時間間隔に基づいて「イベントダミー」(?)を挿入する方法

分類Dev

Python-データフレームの値に基づいてイベントを検証する

分類Dev

パンダのリストに基づいてDataFrameのアイテムをフィルタリングするにはどうすればよいですか?

分類Dev

logcatのこのメッセージでアプリがフリーズします「IMEが2500ミリ秒後に入力イベントを処理するのを待っているタイムアウト」

分類Dev

MIMEタイプのタイプに基づいてRESTリクエストを処理します

分類Dev

各ユーザーのタイムスタンプに基づいてCloudFunctionsを定期的に実行するためのベストプラクティス

分類Dev

インベントリ ファイル内のホストを処理する前にハウスキーピング タスクを実行する方法は?

分類Dev

特定の条件に基づいてデータフレームから特定のデータポイントをフィルタリングする

分類Dev

JavaScript-この関数の構造に基づいてイベントリスナーを削除する方法は?

分類Dev

値のタイプに基づいてキーのサブリストのインターフェイスキーをフィルタリングします

分類Dev

Pythonのインデックス値に基づいてデータフレームをフィルター処理する

分類Dev

ボタンが現在のリクエストをすでに処理している場合にボタンのクリックイベントを防ぐ方法

分類Dev

タイムスタンプ条件に基づいて配列内の一意のオブジェクトをフィルタリングする

分類Dev

今後の/ライブイベントに基づいてFirebaseデータベースからイベントを取得する方法

分類Dev

Java 8:リストをストリーミングし、さまざまなフィルターに基づいて別のリストにマップする

分類Dev

ダイアログのイベントに基づいてメインウィンドウにリストを入力する方法

分類Dev

シェルスクリプトのタイプに基づいて配列の内容を処理します

Related 関連記事

  1. 1

    タイムスタンプ値に基づいてデータをストリーミングおよび処理する(KafkaおよびSparkストリーミングを使用)

  2. 2

    Rxjs:リストの内部値に基づいてデータをフィルタリングする方法は?

  3. 3

    複数の条件に基づいてリストアイテムをフィルタリングする

  4. 4

    db内の製品に基づいてカートアイテムをフィルタリングする

  5. 5

    flinkストリーミングで将来のイベントを処理する方法は?

  6. 6

    インタレストベースに基づいていくつかのグループを作成します

  7. 7

    シリーズマスターイベントに基づいてすべての定期的なイベントをフェッチする方法

  8. 8

    他のリストに基づいて1つのリストをストリーミングおよびフィルタリングする

  9. 9

    Rxjsは、別のストリームに基づいて構成可能なストリームをフィルタリングします

  10. 10

    文字列リストをストリーミングし、各アイテムの番号に基づいて並べ替える方法

  11. 11

    列の値に基づいてテキスト ファイルをフィルター処理する

  12. 12

    実際のイベントデータに基づいて人工的な非イベントを生成する

  13. 13

    異なるデータフレームの時間間隔に基づいて「イベントダミー」(?)を挿入する方法

  14. 14

    Python-データフレームの値に基づいてイベントを検証する

  15. 15

    パンダのリストに基づいてDataFrameのアイテムをフィルタリングするにはどうすればよいですか?

  16. 16

    logcatのこのメッセージでアプリがフリーズします「IMEが2500ミリ秒後に入力イベントを処理するのを待っているタイムアウト」

  17. 17

    MIMEタイプのタイプに基づいてRESTリクエストを処理します

  18. 18

    各ユーザーのタイムスタンプに基づいてCloudFunctionsを定期的に実行するためのベストプラクティス

  19. 19

    インベントリ ファイル内のホストを処理する前にハウスキーピング タスクを実行する方法は?

  20. 20

    特定の条件に基づいてデータフレームから特定のデータポイントをフィルタリングする

  21. 21

    JavaScript-この関数の構造に基づいてイベントリスナーを削除する方法は?

  22. 22

    値のタイプに基づいてキーのサブリストのインターフェイスキーをフィルタリングします

  23. 23

    Pythonのインデックス値に基づいてデータフレームをフィルター処理する

  24. 24

    ボタンが現在のリクエストをすでに処理している場合にボタンのクリックイベントを防ぐ方法

  25. 25

    タイムスタンプ条件に基づいて配列内の一意のオブジェクトをフィルタリングする

  26. 26

    今後の/ライブイベントに基づいてFirebaseデータベースからイベントを取得する方法

  27. 27

    Java 8:リストをストリーミングし、さまざまなフィルターに基づいて別のリストにマップする

  28. 28

    ダイアログのイベントに基づいてメインウィンドウにリストを入力する方法

  29. 29

    シェルスクリプトのタイプに基づいて配列の内容を処理します

ホットタグ

アーカイブ