'merge'を使用して、オブザーバブルのリストの同時実行を制限するにはどうすればよいですか?ただし、すべてのオブザーバブルが完了した後にのみ戻りますか?

コナー・オドハティ

問題:

URLのリストがあります。URLを使用してファイルを取得し、ダウンロードして、ローカルに保存するObservableメソッドがあります。これらのリクエストを並行して起動したいのですが、一度に許可するスレッドは4つだけです(PDFサーバー側を生成し、負荷を減らしたい)。さらに、すべてのURLの場所がダウンロードされたら、このダウンロード手順から戻る必要があります。

現在の解決策

今は、リクエストを一度に起動してforkJoinを使用しています。数日検索した後、私はここでいくつかのアイデアを与えてくれるいくつかの解決策に出くわしましたが、それらは私が望むものを正確に実行しません。私の主な情報源はここにあります

export function limitedParallelObservableExecution<T>(listOfItems: Array<T>, observableMethod: (item: T) => Observable<any>): Observable<any> {
  const MAX_CONCURRENCY = 4;
  if (listOfItems && listOfItems.length > 0) {
    let observableListOfItems: Observable<Observable<any>> = Observable.from(listOfItems).map(
      (item: T) => observableMethod(item)
    );
    return observableListOfItems.merge(MAX_CONCURRENCY);
  } else {
    return Observable.of({});
  }
}

このステップが完了したら実行するためにflatMappedである別のダウンロードステップがあります。ただし、次のステップは1回だけ実行するのではなく、リスト内のURLごとに1回実行します(私が理解しているように、これは、完了するURLごとに1回発行されるためです)。

すべてのダウンロードが完了したときに1回だけ戻るときに、この同時実行性を維持するにはどうすればよいですか?

さらに、これでもすべてのリクエストを一度に起動できるようです。同時リクエストの数を制限するためのより良い方法はありますか?同様に、n個のリクエストを並行して起動しますが、最初のn個のうち1回だけn + 1を起動しますか?

追加のコード例

これは、前のダウンロードステップが完了したときにのみ各ダウンロードステップを起動する方法を示すコードスニペットです。

).flatMap(
  (uploadFlightActualsSuccess) => {
    this.changeProgressValue(this.FLIGHT_ACTUALS_UPLOAD_END); 
    return this.syncDocuments();
  }
).flatMap(
  (syncDocumentsSuccess) => {
    this.changeProgressValue(this.OPERATOR_DOCUMENT_DOWNLOAD_END);
    return this.syncTripDocuments()
  },
  (error) => error
).flatMap(
  (syncTripDocumentsSuccess) => {
    this.changeProgressValue(this.TRIP_DOCUMENT_DOWNLOAD_END);      
    return this.expenseItemSyncProvider.syncPortalData();
  }
).flatMap(
  (expenseItemSyncSuccess) => {
    return this.flightPersonnelSyncProvider.syncFlightPersonnelByTrip();
  }
).flatMap(

「syncTripDocuments」は、URLのリストをダウンロードするリクエストです。これらがすべて完了したら、次のステップに進みたいと思います。

コナー・オドハティ

投稿されたソリューションの問題は(並行性の柔軟性を与えてくれますが)、すべてのアイテムが完了すると、アクション全体を1回だけ発行する必要があるという条件を満たしていないことでした。

実用的な解決策は次のとおりです。

import { toArray, mergeMap } from "rxjs/operators";
import { of, from, Observable } from "rxjs";

export function limitedParallelObservableExecution<T>(
    listOfItems: Array<T>,
    observableMethod: (item: T) => Observable<any>,
    maxConcurrency: number = 4
): Observable<any> {
    if (listOfItems && listOfItems.length > 0) {
        let observableListOfItems: Observable<T> = from(listOfItems);
        return observableListOfItems.pipe(
            mergeMap(observableMethod, maxConcurrency),
            toArray()
       );
    } else {
        return of({});
    }
}

ここでの戦略は次のとおりです。

1)アイテムのリストから監視可能なストリームを作成します

2)maxConcurrencyとともにobservableメソッドをmergeMapに渡します

3)toArray()を使用して、戻る前にすべてのオブザーバブルが完了していることを確認します

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

Related 関連記事

ホットタグ

アーカイブ