我想根据Observable
列表的结果实时生成一个Futures
。
在最简单的情况下,假设我有一份正在运行的期货清单Future.sequence
,而我只是使用监视Observable
每次交易完成时的进度来监视它们的进度。我基本上是这样的:
def observeFuturesProgress(futs: List[Future[Int]]) : Observable[String] = {
Observable[String](observer => {
val loudFutures: List[Future[Int]] = futs.map(f => {
f onComplete {
case Success(a) => observer.onNext(s"just did $a more")
case Failure(e) => observer.onError(e)
}
f
})
Future.sequence(loudFutures) onComplete {
case Success(_) => observer.onCompleted()
case Failure(e) => observer.onError(e)
}
})
}
在我的测试环境中,这可以正常工作。但是我已经读过,onNext
不应从不同的线程调用它,至少要注意没有重叠的调用。建议的解决方法是什么?似乎许多实际用途Observables
都需要onNext
从这样的异步代码中调用,但我在文档中找不到类似的示例。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句