我有一个asnyc
函数,希望在IObservable
序列中的每个观察上调用此函数,从而一次只能将事件传递给一个事件。消费者期望在飞行中不超过一条消息。如果我正确理解的话,这也是RX合同。
考虑以下示例:
static void Main() {
var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
//var d = ob.Subscribe(async x => await Consume(x)); // Does not rate-limit.
var d = ob.Subscribe(x => Consume(x).Wait());
Thread.Sleep(10000);
d.Dispose();
}
static async Task<Unit> Consume(long count) {
Console.WriteLine($"Consuming {count} on thread {Thread.CurrentThread.ManagedThreadId}");
await Task.Delay(750);
Console.WriteLine($"Returning on thread {Thread.CurrentThread.ManagedThreadId}");
return Unit.Default;
}
该Consume
函数伪造了750毫秒的处理时间,并ob
每100毫秒产生一次事件。上面的代码有效,但是task.Wait()
在随机线程上调用。如果我改为按照注释行3中的指示进行订阅,那么Consume
将以ob
产生事件的相同速率进行调用(而且我什至无法理解Subscribe
此注释语句中正在使用的重载内容,因此可能是无稽之谈)。
那么,如何正确地一次将一个事件从一个可观察的序列传递给一个async
函数呢?
订阅服务器不应长时间运行,因此不支持在订阅处理程序中执行长时间运行的异步方法。
相反,应将您的异步方法视为可从另一个序列获取值的单个值可观察序列。现在您可以编写序列,这是Rx设计要做的。
现在您已经实现了这一飞跃,您可能会遇到类似@Reijher在Howto中从rx subscription回调异步函数中创建的内容?。
他的代码分解如下。
//The input sequence. Produces values potentially quicker than consumer
Observable.Interval(TimeSpan.FromSeconds(1))
//Project the event you receive, into the result of the async method
.Select(l => Observable.FromAsync(() => asyncMethod(l)))
//Ensure that the results are serialized
.Concat()
//do what you will here with the results of the async method calls
.Subscribe();
在这种情况下,您正在创建隐式队列。在生产者比消费者快的任何问题中,都需要在等待时使用队列来收集值。就我个人而言,我更喜欢通过将数据放入队列来使其明确。或者,您可以显式地使用Scheduler来发出信号,该信号应该是应该吸收松弛的线程模型。
对于Rx新手来说,这似乎是一个流行的障碍(在订阅处理程序中执行异步)。指导原则不将其放入订阅服务器的原因有很多,例如:1.破坏错误模型2.混合异步模型(此处为rx,此处为任务)3.订阅是以下内容的使用者异步序列。异步方法只是单个值序列,因此该视图不能成为序列的末尾,但是结果可能是这样。
更新
为了说明有关破坏错误模型的注释,这里是对OP示例的更新。
void Main()
{
var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
var d = ob.Subscribe(
x => ConsumeThrows(x).Wait(),
ex=> Console.WriteLine("I will not get hit"));
Thread.Sleep(10000);
d.Dispose();
}
static async Task<Unit> ConsumeThrows(long count)
{
return await Task.FromException<Unit>(new Exception("some failure"));
//this will have the same effect of bringing down the application.
//throw new Exception("some failure");
}
在这里我们可以看到,如果OnNext
要抛出处理程序,那么我们将不受RxOnError
处理程序的保护。该异常将无法处理,并且很有可能导致应用程序崩溃。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句