flatMapはどのようにスレッドを管理しますか?

アルフィーの態度
Flux.just("a", "b", "c")
        .log(null, Level.INFO,true) // line 18
        .flatMap(value -> Mono.just(value.toUpperCase())
                              .publishOn(Schedulers.elastic()), 2)
        .log(null, Level.INFO,true) // line 21
        .subscribe();

出力の一部:

13:03:46 [main] INFO  - | request(2)    Flux.log(App.java:18)
13:03:46 [main] INFO  - | onNext(a)     Flux.log(App.java:18)
13:03:46 [main] INFO  - | onNext(b)     Flux.log(App.java:18)
13:03:46 [elastic-2] INFO  - onNext(A)  Flux.log(App.java:21)
13:03:46 [elastic-2] INFO  - | request(1)   Flux.log(App.java:18)
13:03:46 [main] INFO  - | onNext(c)     Flux.log(App.java:18)
13:03:46 [elastic-3] INFO  - onNext(B)  Flux.log(App.java:21)
13:03:46 [elastic-3] INFO  - | request(1)   Flux.log(App.java:18)
13:03:46 [elastic-2] INFO  - onNext(C)  Flux.log(App.java:21)
13:03:46 [elastic-2] INFO  - | request(1)   Flux.log(App.java:18)
13:03:46 [main] INFO  - | onComplete()  Flux.log(App.java:18)
13:03:46 [main] INFO  - onComplete()    Flux.log(App.java:21)

質問:

  1. スレッドflatMapから2つの要素をmain要求してから、他のスレッドからさらに要素を要求するのはなぜですか?

  2. スレッドでsubscribe処理されないのはなぜmainですか?

ドクカ

なぜ2がメインスレッドから要求されたのですか?

最初のSubscription.request量は、指定した同時実行レベル(2。)によって異なります.subscribeメインスレッドを呼び出すため、最初のprefetchリクエストはそのスレッドで正確に呼び出されます。

次のスキーマを見てみましょう。

.subscribe()[Thread main]-> FluxLog.source.subscribe()[Tread Main]-> FluxFlatMap.source.subscribe()[ThreadMain]-> FluxJust.subscriber.onSubscribe()->FluxFlatMap.subscription.request(concurrency)[Thread Main]

次は何が起こる?

そして、その時点からハードコアになります:)。内側のストリームがサブスクライブさFlatMapInnerれるため、(onNext、onError、onComplete)のすべてのシグナルがScheduler.elastic(のために)監視されます.publishOn次に、内部ストリームが完了すると、そのFlatMapInnner上で、メカニズム全体のドライバーであるonCompleteメインFlatMapMain通知しflatMapます。間の相互作用FlatMapInnerFlatMapMainオーバーされていますFlatMapMain.innerCompleteFlatMapMainの観点からは、internalFlatMapInnerがの役割を果たしているためQueue、すべての要素は次のようになります。drained落ち着いて、ここで何が起こっているのかわからなくても慌てないでください。この方法のすべてのアイデアは、内部ストリームからデータを排出してダウンストリームに送信し、データの新しい部分をアップストリームに要求することです。覚えておくべきことは、innerComplete呼び出されFlatMapInner.onCompleteた場所から別のスケジューラーに移動されたということです。つまり、nextSubscription.requestはで指定されたスレッドから呼び出されるということです。Mono.just(value.toUpperCase()).publishOn(Schedulers.elastic())

したがって、概略的には、そのプロセスは次のようになります。

FluxFlatMap.FlatMapMain.onNext [Thread Main]-> Publisher m = mapper(...)-> m.subscribe(new FluxFlatMap.FlatMapInner())-> FluxFlatMap.FlatMapInner.onNext("a") [Thread Elastic N]-> LambdaSubscriber.onNext("c") [Thread Elastic N]-> FluxFlatMap.FlatMapInner.onComplete() [Thread Elastic N]-> FluxFlatMap.FlatMapMain.drainLoop() [Thread Elastic N]-> FluxFlatMap.FlatMapMain.drainLoop() [Thread Elastic N] { ... subscription.request(amountOfCompletedInners)-> FlatMap.FlatMapMain.onNext() [Thread Elastic N]-> ....- LambdaSubscriber.onNext("c") [Thread Elastic N]>...。

したがって、メインに最初のrequest(2)が表示され、次にエラスティックからのrequest(1)が表示されます(1つの内部が完了しているため、FlatMapは並行性の要求を満たすためにアップストリームから別の1つの要素を要求します)。

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

Nettyはスレッドプールをどのように使用しますか?

分類Dev

OpenMPはどのようにスレッドを再利用しますか

分類Dev

Java:ThreadFactoryはスレッドをどのように処理しますか?

分類Dev

Tomcatはどのように内部でスレッドを作成しますか?

分類Dev

dockerはコンテナのIPアドレスをどのように管理しますか?

分類Dev

複数のブランチ(キュー)をマージするときに、GStreamerはどのようにスレッドを管理しますか

分類Dev

Reactorで複数のスレッドを使用してflatMapを実行するにはどうすればよいですか?

分類Dev

スレッドはどのようにしてミューテックスロックを取得しますか?

分類Dev

スレッドはBegin / Async呼び出し(ソケットIOなど)に対してどのように管理されますか?

分類Dev

NPTLのスレッドはどのように終了しますか?

分類Dev

スレッドはどのようにスタック領域を占有しますか?

分類Dev

Ember Dataはどのようにして大量のレコードを管理しますか?

分類Dev

カーネルはスレッドをプロセスからどのように分離しますか

分類Dev

こんにちは、spring-kafkaはどのように消費者スレッドを処理しますか?

分類Dev

docker-composeはサービス名をIPアドレスにどのようにマップしますか?

分類Dev

Pythonスレッドはどのように機能しますか?

分類Dev

スレッドプールRejectedExecutionHandlerはどのように機能しますか

分類Dev

ExecutorServiceのスレッドをどのように制限しますか?

分類Dev

Golangはどのようにしてゴルーチン用のOSスレッドを実装しますか?

分類Dev

CPUは、どの物理アドレスがどの仮想アドレスにマップされているかをどのように認識しますか?

分類Dev

Javaスレッド:start()-どのようにして新しいスレッドを作成しますか?

分類Dev

どのようにScheduledExecutorServiceハンドルは、スレッドを終了しますか?

分類Dev

デリゲートはどのようにしてメソッドアドレスを取得できますか?

分類Dev

揮発性整数はこのスレッド同期の問題をどのように修正しますか?

分類Dev

.NET FrameworkはどのようにスレッドIDを割り当てますか?

分類Dev

next.js / reactはヘッドレスcmsからページをどのように作成しますか

分類Dev

どのようにJavaでスレッドを中断/停止しますか?

分類Dev

kworkerスレッド名をどのように解釈しますか?

分類Dev

jspはどのようにしてスレッドセーフを保証しますか?

Related 関連記事

  1. 1

    Nettyはスレッドプールをどのように使用しますか?

  2. 2

    OpenMPはどのようにスレッドを再利用しますか

  3. 3

    Java:ThreadFactoryはスレッドをどのように処理しますか?

  4. 4

    Tomcatはどのように内部でスレッドを作成しますか?

  5. 5

    dockerはコンテナのIPアドレスをどのように管理しますか?

  6. 6

    複数のブランチ(キュー)をマージするときに、GStreamerはどのようにスレッドを管理しますか

  7. 7

    Reactorで複数のスレッドを使用してflatMapを実行するにはどうすればよいですか?

  8. 8

    スレッドはどのようにしてミューテックスロックを取得しますか?

  9. 9

    スレッドはBegin / Async呼び出し(ソケットIOなど)に対してどのように管理されますか?

  10. 10

    NPTLのスレッドはどのように終了しますか?

  11. 11

    スレッドはどのようにスタック領域を占有しますか?

  12. 12

    Ember Dataはどのようにして大量のレコードを管理しますか?

  13. 13

    カーネルはスレッドをプロセスからどのように分離しますか

  14. 14

    こんにちは、spring-kafkaはどのように消費者スレッドを処理しますか?

  15. 15

    docker-composeはサービス名をIPアドレスにどのようにマップしますか?

  16. 16

    Pythonスレッドはどのように機能しますか?

  17. 17

    スレッドプールRejectedExecutionHandlerはどのように機能しますか

  18. 18

    ExecutorServiceのスレッドをどのように制限しますか?

  19. 19

    Golangはどのようにしてゴルーチン用のOSスレッドを実装しますか?

  20. 20

    CPUは、どの物理アドレスがどの仮想アドレスにマップされているかをどのように認識しますか?

  21. 21

    Javaスレッド:start()-どのようにして新しいスレッドを作成しますか?

  22. 22

    どのようにScheduledExecutorServiceハンドルは、スレッドを終了しますか?

  23. 23

    デリゲートはどのようにしてメソッドアドレスを取得できますか?

  24. 24

    揮発性整数はこのスレッド同期の問題をどのように修正しますか?

  25. 25

    .NET FrameworkはどのようにスレッドIDを割り当てますか?

  26. 26

    next.js / reactはヘッドレスcmsからページをどのように作成しますか

  27. 27

    どのようにJavaでスレッドを中断/停止しますか?

  28. 28

    kworkerスレッド名をどのように解釈しますか?

  29. 29

    jspはどのようにしてスレッドセーフを保証しますか?

ホットタグ

アーカイブ