Flux.just("a", "b", "c")
.log(null, Level.INFO,true) // line 18
.flatMap(value -> Mono.just(value.toUpperCase())
.publishOn(Schedulers.elastic()), 2)
.log(null, Level.INFO,true) // line 21
.subscribe();
出力の一部:
13:03:46 [main] INFO - | request(2) Flux.log(App.java:18)
13:03:46 [main] INFO - | onNext(a) Flux.log(App.java:18)
13:03:46 [main] INFO - | onNext(b) Flux.log(App.java:18)
13:03:46 [elastic-2] INFO - onNext(A) Flux.log(App.java:21)
13:03:46 [elastic-2] INFO - | request(1) Flux.log(App.java:18)
13:03:46 [main] INFO - | onNext(c) Flux.log(App.java:18)
13:03:46 [elastic-3] INFO - onNext(B) Flux.log(App.java:21)
13:03:46 [elastic-3] INFO - | request(1) Flux.log(App.java:18)
13:03:46 [elastic-2] INFO - onNext(C) Flux.log(App.java:21)
13:03:46 [elastic-2] INFO - | request(1) Flux.log(App.java:18)
13:03:46 [main] INFO - | onComplete() Flux.log(App.java:18)
13:03:46 [main] INFO - onComplete() Flux.log(App.java:21)
質問:
スレッドflatMap
から2つの要素をmain
要求してから、他のスレッドからさらに要素を要求するのはなぜですか?
スレッドでsubscribe
処理されないのはなぜmain
ですか?
最初のSubscription.request
量は、指定した同時実行レベル(2
。)によって異なります。.subscribe
メインスレッドを呼び出すため、最初のprefetch
リクエストはそのスレッドで正確に呼び出されます。
次のスキーマを見てみましょう。
.subscribe()[Thread main]
-> FluxLog.source.subscribe()[Tread Main]
-> FluxFlatMap.source.subscribe()[ThreadMain]
-> FluxJust.subscriber.onSubscribe()
->FluxFlatMap.subscription.request(concurrency)[Thread Main]
そして、その時点からハードコアになります:)。内側のストリームがサブスクライブさFlatMapInner
れるため、(onNext、onError、onComplete)のすべてのシグナルがScheduler.elastic
(のために)監視されます.publishOn
。次に、内部ストリームが完了すると、そのFlatMapInnner
上で、メカニズム全体のドライバーであるonComplete
メインFlatMapMain
に通知しflatMap
ます。間の相互作用FlatMapInner
とFlatMapMain
オーバーされていますFlatMapMain.innerComplete
。FlatMapMainの観点からは、internalFlatMapInner
がの役割を果たしているためQueue
、すべての要素は次のようになります。drained
。落ち着いて、ここで何が起こっているのかわからなくても慌てないでください。この方法のすべてのアイデアは、内部ストリームからデータを排出してダウンストリームに送信し、データの新しい部分をアップストリームに要求することです。覚えておくべきことは、innerComplete
呼び出されFlatMapInner.onComplete
た場所から別のスケジューラーに移動されたということです。つまり、nextSubscription.request
はで指定されたスレッドから呼び出されるということです。Mono.just(value.toUpperCase()).publishOn(Schedulers.elastic())
したがって、概略的には、そのプロセスは次のようになります。
FluxFlatMap.FlatMapMain.onNext [Thread Main]
-> Publisher m = mapper(...)
-> m.subscribe(new FluxFlatMap.FlatMapInner())
-> FluxFlatMap.FlatMapInner.onNext("a") [Thread Elastic N]
-> LambdaSubscriber.onNext("c") [Thread Elastic N]
-> FluxFlatMap.FlatMapInner.onComplete() [Thread Elastic N]
-> FluxFlatMap.FlatMapMain.drainLoop() [Thread Elastic N]
-> FluxFlatMap.FlatMapMain.drainLoop() [Thread Elastic N] { ... subscription.request(amountOfCompletedInners)
-> FlatMap.FlatMapMain.onNext() [Thread Elastic N]
-> ....- LambdaSubscriber.onNext("c") [Thread Elastic N]
>...。
したがって、メインに最初のrequest(2)が表示され、次にエラスティックからのrequest(1)が表示されます(1つの内部が完了しているため、FlatMapは並行性の要求を満たすためにアップストリームから別の1つの要素を要求します)。
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加