RXJSで、retryWhenを使用してbindNodeCallbackで作成されたObservableの再実行を強制するにはどうすればよいですか?

機会

Subjectオブジェクトを受け取りでラップした関数を呼び出すがありますbindNodeCallbackretryWhenエラーが原因で失敗した場合、または結果がfalseに戻った場合は、を使用してその関数の実行を再試行したいと思います。

私はいくつかの異なるアプローチを試しましたが、バインドされた関数を再び起動させることに成功していません。

私が持ってここにcodesandboxのセットアップを

function fakeSend(
  task: string,
  cb: (err: Error | null, result?: boolean) => void
) {
  console.log("fakesend", task);

  setTimeout(() => {
    const hasError = Math.random() < 0.5;
    const res = Math.random() < 0.5;
    console.log(hasError ? "hasError" : `responding with ${res}`);
    if (hasError) {
      return cb(new Error("error"));
    }
    return cb(null, res);
  }, 100);
}

const boundSend = bindNodeCallback(fakeSend);
const subject = new Subject<string>();

subject.subscribe(
  (task) => {
    boundSend(task)
      .pipe(
        tap((status) => {
          if (!status) {
            throw new Error("Did not send");
          }
          return status;
        }),
        retryWhen((errs) =>
          errs.pipe(
            delay(1000),
            tap((err) => console.log)
          )
        )
      )
      .subscribe({
        next: console.log,
        error: console.error,
        complete: () => {
          console.log("complete", task);
        }
      });
  },
  (error) => {
    console.log("error in subject subscribe");
  }
);

subject.next("test1");
subject.next("test2");
subject.next("test3");
subject.next("test4");

  

ありがとう!

const { Subject, bindNodeCallback } = rxjs;
const { tap, retryWhen, delay } = rxjs.operators;

function fakeSend(
  task,
  cb
) {
  console.log("fakesend", task);

  setTimeout(() => {
    const hasError = Math.random() < 0.5;
    const res = Math.random() < 0.5;
    console.log(hasError ? "hasError" : `responding with ${res}`);
    if (hasError) {
      return cb(new Error("error"));
    }
    return cb(null, res);
  }, 100);
}

const boundSend = bindNodeCallback(fakeSend);
const subject = new Subject();

subject.subscribe(
  (task) => {
    boundSend(task)
      .pipe(
        tap((status) => {
          console.log('tap');
          if (!status) {
            throw new Error("Did not send");
          }
          return status;
        }),
        retryWhen((errs) =>
          errs.pipe(
            delay(1000),
            tap((err) => console.log)
          )
        )
      )
      .subscribe({
        next: console.log,
        error: console.error,
        complete: () => {
          console.log("complete", task);
        }
      });
  },
  (error) => {
    console.log("error in subject subscribe");
  }
);

subject.next("test1");
subject.next("test2");
subject.next("test3");
subject.next("test4");
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.2/rxjs.umd.min.js"></script>

編集

私はこれで動作しました:

subject.subscribe(
  (task) => {
    of(task).pipe(
      mergeMap((v) => boundSend(v)),
      tap((val) => {
        if (!val) throw new Error("Did not send");
      }),
      retryWhen((errs) => errs.pipe(delay(300)))
    ).subscribe();
  });

ただし、これが機能しない理由がわかりません。

subject
  .pipe(
    mergeMap((task) => boundSend(task)),
    tap((val) => {
      if (!val) throw new Error("Did not send");
    }),
    retryWhen((errs) => errs.pipe(delay(300)))
  )
  .subscribe();

Edit2

上手。私はこれがそれを説明していると思います:https//github.com/ReactiveX/rxjs/issues/1401

ここで私はそれが私がRX初心者だからだと思っていました:/

Picci

演算子を介してエラーが発生した場合にコールバックベースの関数を再試行できるようにするのnew Observableではなくコンストラクターを使用してみることができると思いますいくつかの詳細が続きます。bindNodeCallbackretryWhen

まず、このようなコンストラクターを使用してコールバックベースの関数からObservableを作成できますnew Observable

function newSend(task: string) {
  return new Observable(
    (subscriber: Subscriber<string>): TeardownLogic => {
      fakeSend(task, (err: Error | null, result?: string) => {
        if (err) {
          subscriber.error(err);
        } else {
          subscriber.next(result);
          subscriber.complete();
        }
      });
    }
  );
}

この関数newSendは、にfakeSend渡されたコールバックによって受信されたパラメーターに応じて、内部的に呼び出し、結果またはエラーを発行するObservableを返しますfakeSend

コールバックベースの関数をラップするObservableを作成する方法ができたので、次のようにmergeMapサブジェクトを介して生成された通知のストリームからコールバックの結果のストリームを作成するために使用できます。

subject
  .pipe(
    mergeMap((task) => {
      return newSend(task);
    })
  )
  .subscribe(
    (data) => console.log("Notification", data),
    (error) => {
      console.log("Error in final subscription", error);
    },
    () => console.log("DONE")
  );

subject.next("test1");
subject.next("test2");
subject.next("test3");
subject.next("test4");

subject.complete();  // added just to show that the complete function of the subscriber can be invoked

上記のコードを実行すると、サブジェクトのサブスクリプションfakeSendは、エラーが発生するまですべての成功した呼び出しをログに記録し、エラーが発生すると、エラーをログに記録して終了します(このstackblitzを参照)。

次に、このようなコールバックretryWhenでエラーが発生した場合に実際に再試行する演算子を追加できfakeSendます

subject
  .pipe(
    mergeMap((task) => {
      return newSend(task).pipe(
        retryWhen((errs) =>
          errs.pipe(
            delay(1000),
            tap((err) => console.log("Retry", err.message))
          )
        )
      );
    })
  )

この時点で、ロジックは完成です。fakeSendコールバックがエラーで呼び出されるたびに、retryWhenオペレーターはfakeSend同じパラメーターを使用して、への新しい呼び出しが実行されることを確認します。完全なコードは、このstackblitzで見ることができます

動作しない理由 bindNodeCallback

その理由は、の実装でbindNodeCallbackAsyncSubject、最後の結果をキャッシュし、後続のサブスクリプションでそれを再生するを内部的使用するためですしたがって、コールバックベースの関数がエラーになり、retryWhenオペレーターが再度サブスクライブすると、サブスクリプションは同じエラーを再度生成し、無限ループを開始します。これは(46行目と47行目)で置き換えるstackblitzで確認できます。newSend(task)boundSend(task)

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

Related 関連記事

ホットタグ

アーカイブ