订阅具有异步功能的可观察序列

千公里

我有一个asnyc函数,希望在IObservable序列中的每个观察上调用函数,从而一次只能将事件传递给一个事件。消费者期望在飞行中不超过一条消息。如果我正确理解的话,这也是RX合同。

考虑以下示例:

static void Main() {
  var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
  //var d = ob.Subscribe(async x => await Consume(x));  // Does not rate-limit.
  var d = ob.Subscribe(x => Consume(x).Wait());
  Thread.Sleep(10000);
  d.Dispose();
}

static async Task<Unit> Consume(long count) {
  Console.WriteLine($"Consuming {count} on thread {Thread.CurrentThread.ManagedThreadId}");
  await Task.Delay(750);
  Console.WriteLine($"Returning on thread {Thread.CurrentThread.ManagedThreadId}");
  return Unit.Default;
}

Consume函数伪造了750毫秒的处理时间,并ob每100毫秒产生一次事件。上面的代码有效,但是task.Wait()在随机线程上调用如果我改为按照注释行3中的指示进行订阅,那么Consume将以ob产生事件的相同速率进行调用(而且我什至无法理解Subscribe此注释语句中正在使用的重载内容,因此可能是无稽之谈)。

那么,如何正确地一次将一个事件从一个可观察的序列传递给一个async函数呢?

李·坎贝尔

订阅服务器不应长时间运行,因此不支持在订阅处理程序中执行长时间运行的异步方法。

相反,应将您的异步方法视为可从另一个序列获取值的单个值可观察序列。现在您可以编写序列,这是Rx设计要做的。

现在您已经实现了这一飞跃,您可能会遇到类似@Reijher在Howto中从rx subscription回调异步函数中创建的内容

他的代码分解如下。

//The input sequence. Produces values potentially quicker than consumer
Observable.Interval(TimeSpan.FromSeconds(1))
      //Project the event you receive, into the result of the async method
      .Select(l => Observable.FromAsync(() => asyncMethod(l)))
      //Ensure that the results are serialized
      .Concat()
      //do what you will here with the results of the async method calls
      .Subscribe();

在这种情况下,您正在创建隐式队列。在生产者比消费者快的任何问题中,都需要在等待时使用队列来收集值。就我个人而言,我更喜欢通过将数据放入队列来使其明确。或者,您可以显式地使用Scheduler来发出信号,该信号应该是应该吸收松弛的线程模型。

对于Rx新手来说,这似乎是一个流行的障碍(在订阅处理程序中执行异步)。指导原则不将其放入订阅服务器的原因有很多,例如:1.破坏错误模型2.混合异步模型(此处为rx,此处为任务)3.订阅是以下内容的使用者异步序列。异步方法只是单个值序列,因此该视图不能成为序列的末尾,但是结果可能是这样。

更新

为了说明有关破坏错误模型的注释,这里是对OP示例的更新。

void Main()
{
    var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
    var d = ob.Subscribe(
        x => ConsumeThrows(x).Wait(),
        ex=> Console.WriteLine("I will not get hit"));

    Thread.Sleep(10000);
    d.Dispose();
}

static async Task<Unit> ConsumeThrows(long count)
{
    return await Task.FromException<Unit>(new Exception("some failure"));
    //this will have the same effect of bringing down the application.
    //throw new Exception("some failure");
}

在这里我们可以看到,如果OnNext要抛出处理程序,那么我们将不受RxOnError处理程序的保护该异常将无法处理,并且很有可能导致应用程序崩溃。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

可观察的。带有异步谓词的地方

来自分类Dev

具有异步功能的JavaScript数组

来自分类Dev

具有异步功能的OracleCommand ExecuteNonQuery

来自分类Dev

具有异步功能的装饰器

来自分类Dev

具有异步功能的意外令牌

来自分类Dev

为什么订阅相同序列的不同可观察对象时,ReplySubject具有不同的行为?

来自分类Dev

具有地图角度的管道可观察订阅

来自分类Dev

具有异步功能的非阻塞I / O

来自分类Dev

具有异步功能执行的PySide应用程序

来自分类Dev

具有异步/等待功能的npm mv模块

来自分类Dev

如果其他可观察对象具有映射功能,则转换可观察对象

来自分类Dev

具有特定“可观察/订阅者”关系的观察者模式

来自分类Dev

Angular / angularfire2-读取可观察的文档并将数据保存到对象。没有异步管道

来自分类Dev

可观察的订阅没有被调用

来自分类Dev

具有异步数据库读取功能的DialogFlow Firebase云功能

来自分类Dev

如果更改可观察对象,异步管道是否会自动取消订阅可观察对象?

来自分类Dev

具有异步请求的NodeJS

来自分类Dev

具有异步结构的openxml

来自分类Dev

具有异步/等待的递归setTimeout

来自分类Dev

具有异步/等待的递归setTimeout

来自分类Dev

具有异步/等待的递归setTimeout

来自分类Dev

具有异步请求的递归函数

来自分类Dev

具有异步方法的RxJava for循环?

来自分类Dev

订阅可观察值

来自分类Dev

可观察的过载订阅

来自分类Dev

订阅可观察值

来自分类Dev

ForEach(内部具有异步功能)完成后的回调

来自分类Dev

从具有异步功能的Node.js模块返回值

来自分类Dev

如何在具有异步功能Swift的循环中使用调度组?

Related 相关文章

热门标签

归档