RxJava merge()でアクティブなストリームの数を制限するにはどうすればよいですか?

user6587056

複数のイベントストリームをmerge()演算子で結合し、「アクティブストリーム」の数を制限する方法を探しています。たとえば、6つのストリームをマージすると(すべてが異なるスレッドで動作している)、merge()はそれらすべてをサブスクライブしますが、最初は3つをサブスクライブする必要があり、そのうちの1つが完了すると別のストリームをサブスクライブする必要があります6つのストリームがすべて完了するまで。マージまたは他のRxJavaオペレーターでこれを達成することは可能ですか、それとも自分で書く必要がありますか?

ポール

マージ演算子で値maxConcurrencyを使用できますが、すべてのObservableを1つにラップする必要があります。

  @Test
    public void testMergeMaxConcurrency() {
        Observable.merge(Observable.just(
                 Observable.just(3),
                 Observable.just(5),
                 Observable.just(1),
                 Observable.just(4),
                 Observable.just(2)), 2)
                .collect(ArrayList<Integer>::new, ArrayList::add)
                .doOnNext(Collections::sort)
                .subscribe(System.out::println);
    }

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

RxJava間隔/無限カウンターをKotlinFlowでリアクティブに実装するにはどうすればよいですか?

分類Dev

RxJavaで、APIから生成された潜在的に無限のイベントストリームを開始するにはどうすればよいですか?

分類Dev

RxJava 2:「List <Item>」を発行する無限ストリームのアイテムをフィルタリングするにはどうすればよいですか?

分類Dev

RXJava / Androidでオブジェクトのリストのフィールドを変更するにはどうすればよいですか

分類Dev

RxJava 2でTestObserverを使用してオブジェクトのリストを単体テストするにはどうすればよいですか?

分類Dev

rxjava delay:リストから発行された各アイテムの可変遅延を取得するにはどうすればよいですか?

分類Dev

RxJava:複数のリクエストが同時に送信されたときにトークンを更新するにはどうすればよいですか?

分類Dev

RxJavaで遅延を伴うリストからアイテムを発行するにはどうすればよいですか?

分類Dev

RxJava2でエラーが返されたときに複数のアイテムを発行するにはどうすればよいですか?

分類Dev

RxJava Completable doOnSuccess関数を単体テストするにはどうすればよいですか?

分類Dev

RxJava 2でgroupByの後にリストを取得するにはどうすればよいですか?

分類Dev

RxJavaを使用して、別のストリームの要素属性を基準として使用して、1つの監視可能なストリームをフィルタリングするにはどうすればよいですか?

分類Dev

RxJavaストリームの途中で非同期操作を条件付きで追加するにはどうすればよいですか?

分類Dev

RxJavaの動的リスト上にオブザーバーを作成するにはどうすればよいですか?

分類Dev

RxJava、disposable.clear()のテストコードを書くにはどうすればよいですか?

分類Dev

RxJavaでアイテムをキャッシュし、キャッシュスタンピードを回避するにはどうすればよいですか?

分類Dev

RxJavaでアイテムをキャッシュし、キャッシュスタンピードを回避するにはどうすればよいですか?

分類Dev

RxJavaを使用してdouble値のネストされたリストをJavaクラスに変換するにはどうすればよいですか?

分類Dev

rxjava2 Zip関数のアリティ(Single / Observableから)を、型を失うことなくNullable引数に一般化するにはどうすればよいですか?

分類Dev

rxjava2 Zip関数のアリティ(Single / Observableから)を、型を失うことなくNullable引数に一般化するにはどうすればよいですか?

分類Dev

タイプを失うことなく、rxjava2 Zip関数(単一/監視可能)のアリティをnオプションの引数に一般化するにはどうすればよいですか?

分類Dev

タイプを失うことなく、rxjava2 Zip関数(単一/監視可能)のアリティをnオプションの引数に一般化するにはどうすればよいですか?

分類Dev

2つのObservable(RxJava)間でサブスクライバーを再利用するにはどうすればよいですか?

分類Dev

RxJavaで遅延してコレクションからアイテムを発行するにはどうすればよいですか?

分類Dev

RxJavaを使用してこのステートメントを実行するにはどうすればよいですか?

分類Dev

RxJavaで複数のObservableを同時に実行するにはどうすればよいですか?

分類Dev

rxJava2でカスタムタスクと別のクラスを通信するにはどうすればよいですか?

分類Dev

このrxjava / rxkotlin flatMapをラムダ式に変換するにはどうすればよいですか?

分類Dev

AndroidのRxJavaでフラットマップを介してカスタムスローアブルを送信するにはどうすればよいですか?

Related 関連記事

  1. 1

    RxJava間隔/無限カウンターをKotlinFlowでリアクティブに実装するにはどうすればよいですか?

  2. 2

    RxJavaで、APIから生成された潜在的に無限のイベントストリームを開始するにはどうすればよいですか?

  3. 3

    RxJava 2:「List <Item>」を発行する無限ストリームのアイテムをフィルタリングするにはどうすればよいですか?

  4. 4

    RXJava / Androidでオブジェクトのリストのフィールドを変更するにはどうすればよいですか

  5. 5

    RxJava 2でTestObserverを使用してオブジェクトのリストを単体テストするにはどうすればよいですか?

  6. 6

    rxjava delay:リストから発行された各アイテムの可変遅延を取得するにはどうすればよいですか?

  7. 7

    RxJava:複数のリクエストが同時に送信されたときにトークンを更新するにはどうすればよいですか?

  8. 8

    RxJavaで遅延を伴うリストからアイテムを発行するにはどうすればよいですか?

  9. 9

    RxJava2でエラーが返されたときに複数のアイテムを発行するにはどうすればよいですか?

  10. 10

    RxJava Completable doOnSuccess関数を単体テストするにはどうすればよいですか?

  11. 11

    RxJava 2でgroupByの後にリストを取得するにはどうすればよいですか?

  12. 12

    RxJavaを使用して、別のストリームの要素属性を基準として使用して、1つの監視可能なストリームをフィルタリングするにはどうすればよいですか?

  13. 13

    RxJavaストリームの途中で非同期操作を条件付きで追加するにはどうすればよいですか?

  14. 14

    RxJavaの動的リスト上にオブザーバーを作成するにはどうすればよいですか?

  15. 15

    RxJava、disposable.clear()のテストコードを書くにはどうすればよいですか?

  16. 16

    RxJavaでアイテムをキャッシュし、キャッシュスタンピードを回避するにはどうすればよいですか?

  17. 17

    RxJavaでアイテムをキャッシュし、キャッシュスタンピードを回避するにはどうすればよいですか?

  18. 18

    RxJavaを使用してdouble値のネストされたリストをJavaクラスに変換するにはどうすればよいですか?

  19. 19

    rxjava2 Zip関数のアリティ(Single / Observableから)を、型を失うことなくNullable引数に一般化するにはどうすればよいですか?

  20. 20

    rxjava2 Zip関数のアリティ(Single / Observableから)を、型を失うことなくNullable引数に一般化するにはどうすればよいですか?

  21. 21

    タイプを失うことなく、rxjava2 Zip関数(単一/監視可能)のアリティをnオプションの引数に一般化するにはどうすればよいですか?

  22. 22

    タイプを失うことなく、rxjava2 Zip関数(単一/監視可能)のアリティをnオプションの引数に一般化するにはどうすればよいですか?

  23. 23

    2つのObservable(RxJava)間でサブスクライバーを再利用するにはどうすればよいですか?

  24. 24

    RxJavaで遅延してコレクションからアイテムを発行するにはどうすればよいですか?

  25. 25

    RxJavaを使用してこのステートメントを実行するにはどうすればよいですか?

  26. 26

    RxJavaで複数のObservableを同時に実行するにはどうすればよいですか?

  27. 27

    rxJava2でカスタムタスクと別のクラスを通信するにはどうすればよいですか?

  28. 28

    このrxjava / rxkotlin flatMapをラムダ式に変換するにはどうすればよいですか?

  29. 29

    AndroidのRxJavaでフラットマップを介してカスタムスローアブルを送信するにはどうすればよいですか?

ホットタグ

アーカイブ