私はRxJava2を学ぼうとしていますが、昨日の午後からRxJavaライブラリのv1からv2に変換しようとして苦労しています...私がしていることに役立つこの関数に出くわしました。私はリアクティブプログラミングパラダイム全体を理解しています。
List<Integer> emitList = ...;
Observable<Integer> observable = Observable.from(emitList);
observable
.subscribeOn(Schedulers.newThread())
.parallel((a) -> {
return a
.filter((i) -> {
return i % 2 == 0;
})
.doOnNext((xx) -> {
System.out.println("parallel thread in: " + ThreadUtils.currentThreadName());
System.out.println("parallel: " + xx);
ThreadUtils.sleep(10);
System.out.println("parallel thread out: " + ThreadUtils.currentThreadName());
});
},
Schedulers.io()
)
.subscribe(
(i) -> {
System.out.println("onNext thread entr: " + ThreadUtils.currentThreadName());
System.out.println(i);
System.out.println("onNext thread exit: " + ThreadUtils.currentThreadName());
},
(t) -> {
t.printStackTrace();
},
() -> {
System.out.println("onCompleted()");
}
);
そして、私が持っている最も遠いのはこれです:
Observable<Integer> observable = ....
observable.subscribeOn(Schedulers.newThread())
.filter(i -> i % 2 == 0)
.doOnNext(i -> {
System.out.println("parallel thread in: " + threadName());
System.out.println("parallel: " + i);
Thread.sleep(10);
})
.subscribe(
number -> System.out.println(threadName() + ": " + number),
throwable -> System.err.println(threadName() + ": " + throwable.toString()),
() -> System.out.println(threadName() + ": Completed!")
);
私がやっていることには多くの間違いがあることを私は知っています。最初に、フィルタリングとdoOnNextは並列句の内側にありますが、私の「アプローチ」ではその外側であり、誰が他に何を知っているかを知っています。RxJavaリポジトリでテストを実行しようとしましたが、これに類似するものを特定できませんでした。FlowableとParallelFlowableを見ましたが、私のバージョンで並列処理を実現する方法が見つからないという点で、それらはまったく異なります...ところで何も出力されません。
RxJava 2での並列処理Flowable
は、次のものと同じ流暢なAPI設計に関連付けられて使用されますObservable
。
Flowable<Integer> f = ....
f.subscribeOn(Schedulers.newThread())
.parallel() // <---------------------------------
.runOn(Schedulers.computation()) // <---------------------------------
.filter(i -> i % 2 == 0)
.doOnNext(i -> {
System.out.println("parallel thread in: " + threadName());
System.out.println("parallel: " + i);
Thread.sleep(10);
})
.sequential() // <---------------------------------
.subscribe(
number -> System.out.println(threadName() + ": " + number),
throwable -> System.err.println(threadName() + ": " + throwable.toString()),
() -> System.out.println(threadName() + ": Completed!")
);
Thread.sleep(10000);
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加