RxSwift 中的滑动窗口

来自 RxJava 背景,我无法提出在 RxSwift 中实现滑动窗口的标准方法。例如,我有以下事件序列:

1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, ...

让我们想象一下事件发射在一秒钟内发生两次。我想要做的是将这个序列转换成一个缓冲区序列,每个缓冲区包含最后三秒的数据。另外,每个缓冲区将每秒发出一次。所以结果看起来像这样:

[1,2,3,4,5,6], [3,4,5,6,7,8], [5,6,7,8,9,10], ...

我将在 RxJava 中做的是我将使用该buffer方法的重载之一,如下所示:

stream.buffer(3000, 1000, TimeUnit.MILLISECONDS)

这正是我需要完成的结果:缓冲区序列,每个缓冲区每秒发出一次并包含最后三秒的数据。

我广泛地检查了 RxSwift 文档,但没有发现任何buffer允许我这样做运算符重载我是否错过了一些不明显的(对于 RxJava 用户,ofc)操作符?

丹尼尔·T。

我最初使用自定义运算符编写了解决方案。从那以后,我想出了如何使用标准运算符来完成它。

extension ObservableType {

    func buffer(timeSpan: RxTimeInterval, timeShift: RxTimeInterval, scheduler: SchedulerType) -> Observable<[E]> {
        let trigger = Observable<Int>.timer(timeSpan, period: timeShift, scheduler: scheduler)
            .takeUntil(self.takeLast(1))

        let buffer = self
            .scan([Date: E]()) { previous, current in
                var next = previous
                let now = scheduler.now
                next[now] = current
                return next.filter { $0.key > now.addingTimeInterval(-timeSpan) }
        }

        return trigger.withLatestFrom(buffer)
            .map { $0.sorted(by: { $0.key <= $1.key }).map { $0.value } }
    }
}

我将在下面留下我的原始解决方案以供后代使用:


编写您自己的运算符是这里的解决方案。

extension ObservableType {

    func buffer(timeSpan: RxTimeInterval, timeShift: RxTimeInterval, scheduler: SchedulerType) -> Observable<[E]> {
        return Observable.create { observer in
            var buf: [Date: E] = [:]
            let lock = NSRecursiveLock()
            let elementDispoable = self.subscribe { event in
                lock.lock(); defer { lock.unlock() }
                switch event {
                case let .next(element):
                    buf[Date()] = element
                case .completed:
                    observer.onCompleted()
                case let .error(error):
                    observer.onError(error)
                }
            }
            let spanDisposable = scheduler.schedulePeriodic((), startAfter: timeSpan, period: timeShift, action: { state in
                lock.lock(); defer { lock.unlock() }
                let now = Date()
                buf = buf.filter { $0.key > now.addingTimeInterval(-timeSpan) }
                observer.onNext(buf.sorted(by: { $0.key <= $1.key }).map { $0.value })
            })
            return Disposables.create([spanDisposable, elementDispoable])
        }
    }
}

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

RxSwift中的双向绑定

来自分类Dev

在 RXSwift 中返回 BehaviourSubject

来自分类Dev

RxSwift中关于flatMapLatest的困惑

来自分类Dev

在RxSwift中延迟集合的迭代

来自分类Dev

RxSwift 链接:在 Rxswift 中编写这个的更好方法是什么?

来自分类Dev

Python中的滑动窗口

来自分类Dev

在RxSwift中手动处置DisposeBag

来自分类Dev

如何在RxSwift中配置Bool流

来自分类Dev

RXSwift闭包中的“ [弱自我]”

来自分类Dev

RxSwift-获取列表中的每个项目

来自分类Dev

如何在RxSwift中设置延迟?

来自分类Dev

RxSwift:观察存储在 UserDefaults 中的数组

来自分类Dev

RxSwift 中的可观察元素数组

来自分类Dev

如何在 RxSwift 中组合多个 Observable

来自分类Dev

在 RxSwift 中迭代并添加到数组

来自分类Dev

Excel中的可变滑动窗口

来自分类Dev

功能编程中的滑动窗口匹配

来自分类Dev

在滑动窗口步骤中合并列

来自分类Dev

Python中的CSV文件(滑动窗口)

来自分类Dev

apache-flink:输出中的滑动窗口

来自分类Dev

R中的滑动归一化窗口

来自分类Dev

在RxSwift中重新启动可观察间隔的正确方法

来自分类Dev

在RxSwift中手动发出事件

来自分类Dev

在RxSwift单元测试中模拟和验证结果

来自分类Dev

链接RxSwift和Alamofire的异步请求中的call func

来自分类Dev

RxSwift中的PublishSubject和PublishRelay有什么区别?

来自分类Dev

如何观察属性是否在RxSwift中的特定间隔内不变

来自分类Dev

处理UITableView绑定中的连接错误(Moya,RxSwift,RxCocoa)

来自分类Dev

在 MVVM 和 RxSwift 中围绕 ViewControllers 传递数据