通过使用Reactive Extensions,我创建了一个值的滚动缓冲区,该值在数据流中缓存了一小段最近值的历史记录,供绘图应用程序使用。由于这些值的到达速度比我感兴趣的显示速度快得多,因此我想在我的反应式管道中使用Sample(Timespan span)方法来减慢速度。但是,将其添加到下面的示例中会使WriteEnumerable方法中的某个位(集合被修改)之后引发Exception。显然,这是与Sample有关的线程问题,但是我对如何减轻它感到困惑。我尝试将计划程序设置为在Sample方法中使用无济于事。
有什么建议吗?
class Program
{
static void Main(string[] args)
{
Observable.Interval(TimeSpan.FromSeconds(0.1))
.Take(500)
.TimedRollingBuffer(TimeSpan.FromSeconds(10))
.Sample(TimeSpan.FromSeconds(0.5))
.Subscribe(frame => WriteEnumerable(frame));
var input = "";
while (input != "exit")
{
input = Console.ReadLine();
}
}
private static void WriteEnumerable<T>(IEnumerable<T> enumerable)
{
foreach (T thing in enumerable)
Console.WriteLine(thing + " " + DateTime.UtcNow);
Console.WriteLine(Environment.NewLine);
}
}
public static class Extensions
{
public static IObservable<IEnumerable<Timestamped<T>>> TimedRollingBuffer<T>(this IObservable<T> observable, TimeSpan timeRange)
{
return Observable.Create<IEnumerable<Timestamped<T>>>(
o =>
{
var queue = new Queue<Timestamped<T>>();
return observable.Timestamp().Subscribe(
tx =>
{
queue.Enqueue(tx);
DateTime now = DateTime.Now;
while (queue.Peek().Timestamp < now.Subtract(timeRange))
queue.Dequeue();
o.OnNext(queue);
},
ex => o.OnError(ex),
() => o.OnCompleted()
);
});
}
}
贷项到期的地方:被动扩展滑动时间窗口
例外是由于您在枚举队列时正在修改队列。
RollingBuffer
@Enigmativity引用的实现是正确的事情-您将在他的实现中注意到OnNext
,通过ToArray()
确保列表的副本被分发给观察者而不是变异的原始对象来调用。
在您的情况下,Sample
引入并发引入(这很好-这是应该做的)-但是,您正在传递队列本身,由于引入的并发,在枚举发生时该队列会发生变化。这是一个错误。
在您的中TimedRollingBuffer
,如果要使用o.OnNext(queue.ToArray())
而不是,o.OnNext(queue)
则不会有此问题。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句