リアクティブMongoDBを使用してWebfluxのMonoでn回操作を実行します

サイモン

リアクティブMongoDBドライバーとSpringを使用するWebfluxアプリケーションのデータローダーのシナリオは次のとおりです。

  1. タイプBのXオブジェクトを作成します
  2. タイプAのYオブジェクトを作成します。オブジェクトAには、タイプ配列のフィールドとタイプBのオブジェクトへの参照が含まれます。Bへの参照は、最初のステップからランダムに選択されます。
  3. 以前に作成したオブジェクトの配列にN個のエントリを追加します

私が直面している問題は、Mono / Fluxの並列実行であるように思われますが、これは私の理解からは起こらないはずです。ドキュメントによると、特に指定がない限り、物事は常に順番に実行されます。

誰かが私が間違っていることのヒントを教えてもらえますか?

コードスニペットの例を次に示します。オブジェクトAはトイレです。オブジェクトBはユーザーです。配列フィールドはコメントフィールドです。

Flux.range(0, 10)
            // create 10 objects of type user
            .flatMap {
                LOG.debug("Creating user $it")
                userRepository.save(
                    User(
                        id = ObjectId(),
                        name = userNames.random(),
                        email = "${userNames.random()}@mail.com"
                    )
                )
            }
            .collectList()
            // create 2 objects of type toilet
            .flatMapMany { userList ->
                Flux.range(0, 2).zipWith(Flux.range(0, 2).map { userList })
            }
            .flatMap {
                LOG.debug("Creating toilet ${it.t1}")
                val userList = it.t2

                toiletRepository.save(
                    Toilet(
                        id = ObjectId(),
                        title = userList.random().name
                    )
                )
            }
            // add 5 entries to array of toilet
            .flatMap { toilet ->
                Flux.range(0, 5).zipWith(Flux.range(0, 5).map { toilet })
            }
            .flatMap { tuple ->
                val toilet = tuple.t2
                LOG.debug("Creating comment ${tuple.t1} for toilet $toilet")

                // get current values from toilet
                toiletRepository.findById(toilet.id).map {
                    // and push a new element to the comments array
                    LOG.debug("Comment size ${it.commentRefs.size}")
                    toiletRepository.save(it.apply { commentRefs.add(ObjectId()) })
                }
            }
            .subscribe {
                GlobalScope.launch {
                    exitProcess(SpringApplication.exit(context))
                }
            }

このコードを実行すると、次のログが生成されます。

2020-11-15 19:42:54.197 DEBUG 13524 --- [           main] c.g.q.t.DataLoaderRunner                 : Creating user 0
2020-11-15 19:42:54.293 DEBUG 13524 --- [           main] c.g.q.t.DataLoaderRunner                 : Creating user 1
2020-11-15 19:42:54.295 DEBUG 13524 --- [           main] c.g.q.t.DataLoaderRunner                 : Creating user 2
2020-11-15 19:42:54.296 DEBUG 13524 --- [           main] c.g.q.t.DataLoaderRunner                 : Creating user 3
2020-11-15 19:42:54.300 DEBUG 13524 --- [           main] c.g.q.t.DataLoaderRunner                 : Creating user 4
2020-11-15 19:42:54.301 DEBUG 13524 --- [           main] c.g.q.t.DataLoaderRunner                 : Creating user 5
2020-11-15 19:42:54.304 DEBUG 13524 --- [           main] c.g.q.t.DataLoaderRunner                 : Creating user 6
2020-11-15 19:42:54.310 DEBUG 13524 --- [           main] c.g.q.t.DataLoaderRunner                 : Creating user 7
2020-11-15 19:42:54.316 DEBUG 13524 --- [           main] c.g.q.t.DataLoaderRunner                 : Creating user 8
2020-11-15 19:42:54.318 DEBUG 13524 --- [           main] c.g.q.t.DataLoaderRunner                 : Creating user 9
2020-11-15 19:42:54.348 DEBUG 13524 --- [tLoopGroup-3-10] c.g.q.t.DataLoaderRunner                 : Creating toilet 0
2020-11-15 19:42:54.380 DEBUG 13524 --- [tLoopGroup-3-10] c.g.q.t.DataLoaderRunner                 : Creating toilet 1
2020-11-15 19:42:54.386 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner                 : Creating comment 0 for toilet Toilet(id=5fb176aef24f4c248fbb051c, title=wholesale, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.405 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner                 : Creating comment 1 for toilet Toilet(id=5fb176aef24f4c248fbb051c, title=wholesale, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.406 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner                 : Creating comment 2 for toilet Toilet(id=5fb176aef24f4c248fbb051c, title=wholesale, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.407 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner                 : Creating comment 3 for toilet Toilet(id=5fb176aef24f4c248fbb051c, title=wholesale, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.409 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner                 : Creating comment 4 for toilet Toilet(id=5fb176aef24f4c248fbb051c, title=wholesale, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.410 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner                 : Creating comment 0 for toilet Toilet(id=5fb176aef24f4c248fbb051d, title=imaginary, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.412 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner                 : Creating comment 1 for toilet Toilet(id=5fb176aef24f4c248fbb051d, title=imaginary, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.413 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner                 : Creating comment 2 for toilet Toilet(id=5fb176aef24f4c248fbb051d, title=imaginary, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.414 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner                 : Creating comment 3 for toilet Toilet(id=5fb176aef24f4c248fbb051d, title=imaginary, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.415 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner                 : Creating comment 4 for toilet Toilet(id=5fb176aef24f4c248fbb051d, title=imaginary, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.425 DEBUG 13524 --- [tLoopGroup-3-10] c.g.q.t.DataLoaderRunner                 : Comment size 0
2020-11-15 19:42:54.425 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner                 : Comment size 0
2020-11-15 19:42:54.425 DEBUG 13524 --- [ntLoopGroup-3-6] c.g.q.t.DataLoaderRunner                 : Comment size 0
2020-11-15 19:42:54.425 DEBUG 13524 --- [ntLoopGroup-3-2] c.g.q.t.DataLoaderRunner                 : Comment size 0
2020-11-15 19:42:54.425 DEBUG 13524 --- [ntLoopGroup-3-3] c.g.q.t.DataLoaderRunner                 : Comment size 0
2020-11-15 19:42:54.425 DEBUG 13524 --- [ntLoopGroup-3-9] c.g.q.t.DataLoaderRunner                 : Comment size 0
2020-11-15 19:42:54.425 DEBUG 13524 --- [ntLoopGroup-3-7] c.g.q.t.DataLoaderRunner                 : Comment size 0
2020-11-15 19:42:54.429 DEBUG 13524 --- [ntLoopGroup-3-2] c.g.q.t.DataLoaderRunner                 : Comment size 0
2020-11-15 19:42:54.429 DEBUG 13524 --- [ntLoopGroup-3-9] c.g.q.t.DataLoaderRunner                 : Comment size 0
2020-11-15 19:42:54.464 DEBUG 13524 --- [tLoopGroup-3-10] c.g.q.t.DataLoaderRunner                 : Comment size 0

私は今3つの質問があります:

  1. スレッドがメインからループグループに切り替わるのはなぜですか?シーケンスで実行される場合は、マルチスレッドをまったく使用しないでください。
  2. Comment sizeログメッセージが最後にグループ化されるのはなぜですか?
  3. リアクティブmongoリポジトリの実装を使用して要素を配列に正しくプッシュする方法は?

ヒントは大歓迎です。私は、ネストされた実行することを前提とfindByIdしてsave正しくないですが、どのように違ったことを書きますか?以来はsave、私は、配列内の1つの追加的な要素が含まれているエンティティの最新バージョンに渡す必要が実体を必要とします。で最新バージョンを取得し、findById「マップ->保存」で直接変更することでそれを達成しようとしています。

皆さん、ありがとうございました!

サイモン

これが最善の方法かどうかはわかりませんが、関数内の操作を分割してより論理的にグループ化することで、目的を達成することができました。

次の操作のスニペットは次のとおりです。

  1. ユーザーを作成する
  2. コメントを作成する
  3. 評価を作成する
private fun createUsers() = Flux.range(0, userNames.size + 1)
        .flatMap {
            if (it < userNames.size) {
                LOG.debug("Creating user $it")
                userRepository.save(
                    User(
                        id = ObjectId(),
                        name = userNames[it],
                        email = "${userNames[it]}@mail.com"
                    )
                )
            } else {
                LOG.debug("Creating dev-user")
                userRepository.save(
                    User(
                        id = ObjectId("000000000000012343456789"),
                        name = "devuser",
                        email = "[email protected]"
                    )
                )
            }
        }
        .collectList()
private fun createComments(users: List<User>) = Flux.range(0, numComments)
        .flatMap {
            LOG.debug("Creating comment $it")
            commentRepository.save(
                Comment(
                    id = ObjectId(),
                    text = commentTexts.random(),
                    userRef = users.random().id
                )
            )
        }
        .collectList()
private fun createRatings(users: List<User>) = Flux.range(0, numRatings)
        .flatMap {
            LOG.debug("Creating rating $it")
            ratingRepository.save(
                Rating(
                    id = ObjectId(),
                    userRef = users.random().id,
                    value = Random.nextInt(0, 5)
                )
            )
        }
        .collectList()

そして最後に、上記の結果でトイレを作成します。

private fun createToilets(comments: List<Comment>, ratings: List<Rating>) = Flux.range(0, numToilets)
        .flatMap {
            val toilet = Toilet(
                id = ObjectId(),
                title = titles.random(),
                location = GeoJsonPoint(Random.nextDouble(10.0, 20.0), Random.nextDouble(45.0, 55.0)),
                description = descriptions.random()
            )

            // add comments
            val commentsToAdd = Random.nextInt(0, comments.size)
            for (i in 0 until commentsToAdd) {
                toilet.commentRefs.add(comments[i].id)
            }

            // add average rating and rating references
            val ratingsToAdd = Random.nextInt(0, ratings.size)
            for (i in 0 until ratingsToAdd) {
                toilet.ratingRefs.add(ratings[i].id)
                toilet.averageRating += ratings[i].value
            }
            if (toilet.ratingRefs.isNotEmpty()) {
                toilet.averageRating /= toilet.ratingRefs.size
            }

            LOG.debug("Creating toilet $it with $commentsToAdd comments and $ratingsToAdd ratings")

            toiletRepository.save(toilet)
        }
        // upload preview image
        .flatMap { toilet ->
            val imageName = "toilet${Random.nextInt(1, 10)}.jpg"
            imageService.store(
                Callable {
                    DataLoaderRunner::class.java.getResourceAsStream("/sample-images/$imageName")
                },
                "${toilet.title}-preview"
            ).zipWith(Mono.just(toilet))
        }
        // set preview image
        .flatMap {
            val imageId = it.t1
            val toilet = it.t2

            toiletRepository.save(toilet.copy(previewID = imageId))
        }
        .collectList()

これが最終的なリアクティブオペレーションチェーンです。

createUsers()
            .flatMap { users ->
                createComments(users).map { comments ->
                    Tuples.of(users, comments)
                }
            }
            .flatMap {
                val users = it.t1
                val comments = it.t2

                createRatings(users).map { ratings ->
                    Tuples.of(comments, ratings)
                }
            }
            .flatMap {
                val comments = it.t1
                val ratings = it.t2

                createToilets(comments, ratings)
            }
            // close application when all toilets are processed
            .subscribe {
                GlobalScope.launch {
                    exitProcess(SpringApplication.exit(context))
                }
            }

これが最善の方法かどうかはわかりませんが、機能しています。冒頭の投稿でのアプローチは、ネストされたマップ/フラットマップ操作を使用していますが、これはとにかく避ける必要があり、おそらくそれらが機能しなかった理由です。

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

Spring WebFlux:Spring Data MongoDBリアクティブリポジトリのnull値で例外を発行しますか?

分類Dev

アクティブレコードを使用してクエリを実行する場所で、文字列の最初の2文字を無視します

分類Dev

複数のリアクティブ操作を実行する正しい方法

分類Dev

mongodbjavaドライバーを使用してネイティブmongodbクエリを実行します

分類Dev

Ruby onRailsのアクティブレコードを使用してSQL操作を実行する

分類Dev

Clojure:出力を次の操作の入力として使用して操作をn回実行します(a-la reduce)

分類Dev

Spring WebFluxを使用してリアクティブなメモリ内リポジトリをどのように構築しますか?

分類Dev

Shinyでリアクティブ値を使用してsqldfでSQLを実行する

分類Dev

リアクティブmongoDBを使用してMicronautでTextIndexを作成する

分類Dev

Shinyでリアクティブオブジェクトの実行を停止/再開しますか?

分類Dev

Reactor / WebFluxはリアクティブhttpニュースティッカーを実装します

分類Dev

Angularでは、ui-sortableを使用してテンプレートをコンパイルし、接続されたリストで2回実行するディレクティブは、両方のリスト間でドラッグアンドドロップできません

分類Dev

Bashを使用してすべてのサブディレクトリでアクションを実行します

分類Dev

元の値を実行して渡すリアクティブ関数はありますか?

分類Dev

JPARespositoryを使用して3つのプロパティでクエリを実行します

分類Dev

ライブラリを2回実行して、jupyterでインタラクティブなプロットを作成します

分類Dev

Hangfireは、非アクティブの場合でもSQLクエリを実行し続けます

分類Dev

AndroidでEspressoを使用して2番目のテストを実行する前に、すべてのアクティビティを強制終了します

分類Dev

私は何が欠けていますか?axiosを使用して削除リクエストを実行し、mongodbをbakcendとして使用してreq.params.idでトラブルシューティングします

分類Dev

すべてのサブディレクトリでgitpullを実行します

分類Dev

すべてのサブディレクトリでgitpullを実行します

分類Dev

すべてのサブディレクトリでgitpullを実行します

分類Dev

Hibernate:ネイティブの一括操作クエリを実行できませんでした

分類Dev

リモートディスプレイをアクティブにして、Xアプリケーションをリモートで実行します

分類Dev

2回目のクリックで、JQUERYを使用して別の関数を実行します

分類Dev

.netSDKを使用してAzureData FactoryV2でアクティビティを再実行します

分類Dev

nosetest-サブディレクトリですべてのテストを実行します

分類Dev

IntentServiceを使用して実行中のアクティビティを更新します

分類Dev

アプリがアクティブでない/実行されていないときにアクションを実行します

Related 関連記事

  1. 1

    Spring WebFlux:Spring Data MongoDBリアクティブリポジトリのnull値で例外を発行しますか?

  2. 2

    アクティブレコードを使用してクエリを実行する場所で、文字列の最初の2文字を無視します

  3. 3

    複数のリアクティブ操作を実行する正しい方法

  4. 4

    mongodbjavaドライバーを使用してネイティブmongodbクエリを実行します

  5. 5

    Ruby onRailsのアクティブレコードを使用してSQL操作を実行する

  6. 6

    Clojure:出力を次の操作の入力として使用して操作をn回実行します(a-la reduce)

  7. 7

    Spring WebFluxを使用してリアクティブなメモリ内リポジトリをどのように構築しますか?

  8. 8

    Shinyでリアクティブ値を使用してsqldfでSQLを実行する

  9. 9

    リアクティブmongoDBを使用してMicronautでTextIndexを作成する

  10. 10

    Shinyでリアクティブオブジェクトの実行を停止/再開しますか?

  11. 11

    Reactor / WebFluxはリアクティブhttpニュースティッカーを実装します

  12. 12

    Angularでは、ui-sortableを使用してテンプレートをコンパイルし、接続されたリストで2回実行するディレクティブは、両方のリスト間でドラッグアンドドロップできません

  13. 13

    Bashを使用してすべてのサブディレクトリでアクションを実行します

  14. 14

    元の値を実行して渡すリアクティブ関数はありますか?

  15. 15

    JPARespositoryを使用して3つのプロパティでクエリを実行します

  16. 16

    ライブラリを2回実行して、jupyterでインタラクティブなプロットを作成します

  17. 17

    Hangfireは、非アクティブの場合でもSQLクエリを実行し続けます

  18. 18

    AndroidでEspressoを使用して2番目のテストを実行する前に、すべてのアクティビティを強制終了します

  19. 19

    私は何が欠けていますか?axiosを使用して削除リクエストを実行し、mongodbをbakcendとして使用してreq.params.idでトラブルシューティングします

  20. 20

    すべてのサブディレクトリでgitpullを実行します

  21. 21

    すべてのサブディレクトリでgitpullを実行します

  22. 22

    すべてのサブディレクトリでgitpullを実行します

  23. 23

    Hibernate:ネイティブの一括操作クエリを実行できませんでした

  24. 24

    リモートディスプレイをアクティブにして、Xアプリケーションをリモートで実行します

  25. 25

    2回目のクリックで、JQUERYを使用して別の関数を実行します

  26. 26

    .netSDKを使用してAzureData FactoryV2でアクティビティを再実行します

  27. 27

    nosetest-サブディレクトリですべてのテストを実行します

  28. 28

    IntentServiceを使用して実行中のアクティビティを更新します

  29. 29

    アプリがアクティブでない/実行されていないときにアクションを実行します

ホットタグ

アーカイブ