How to manage observable subscription for dependent observables?

javaCity

This sample console application has 2 observables. The first one pushes numbers from 1 to 100. This observable is subscribed by the AsyncClass which runs a long running process for each number it gets. Upon completion of this new async process I want to be able to 'push' to 2 subscribers which would be doing something with this new value.

My attempts are commented in the source code below.

AsyncClass:

class AsyncClass
    {
        private readonly IConnectableObservable<int> _source;
        private readonly IDisposable _sourceDisposeObj;
        public IObservable<string> _asyncOpObservable;

        public AsyncClass(IConnectableObservable<int> source)
        {
            _source           = source;
            _sourceDisposeObj = _source.Subscribe(
                                    ProcessArguments,
                                    ExceptionHandler,
                                    Completed
                                );
            _source.Connect();


        }

        private void Completed()
        {
            Console.WriteLine("Completed");
            Console.ReadKey();
        }

        private void ExceptionHandler(Exception exp)
        {
            throw exp;
        }

        private void ProcessArguments(int evtArgs)
        {
            Console.WriteLine("Argument being processed with value: " + evtArgs);

            //_asyncOpObservable = LongRunningOperationAsync("hello").Publish(); 
            // not going to work either since this creates a new observable for each value from main observer

        }
        // http://rxwiki.wikidot.com/101samples
        public IObservable<string> LongRunningOperationAsync(string param)
        {
            // should not be creating an observable here, rather 'pushing' values?
            return Observable.Create<string>(
                o => Observable.ToAsync<string, string>(DoLongRunningOperation)(param).Subscribe(o)
            );
        }

        private string DoLongRunningOperation(string arg)
        {
            return "Hello";
        }
    }

Main:

static void Main(string[] args)
        {
            var source = Observable
                            .Range(1, 100)
                            .Publish();

            var asyncObj = new AsyncClass(source);
            var _asyncTaskSource = asyncObj._asyncOpObservable;

            var ui1 = new UI1(_asyncTaskSource);
            var ui2 = new UI2(_asyncTaskSource);
        }

UI1 (and UI2, they're basically the same):

 class UI1
    {
        private IConnectableObservable<string> _asyncTaskSource;
        private IDisposable _taskSourceDisposable;

        public UI1(IConnectableObservable<string> asyncTaskSource)
        {
            _asyncTaskSource = asyncTaskSource;
            _asyncTaskSource.Connect();
            _taskSourceDisposable = _asyncTaskSource.Subscribe(RefreshUI, HandleException, Completed);
        }

        private void Completed()
        {
            Console.WriteLine("UI1: Stream completed");
        }

        private void HandleException(Exception obj)
        {
            Console.WriteLine("Exception! "+obj.Message);
        }

        private void RefreshUI(string obj)
        {
            Console.WriteLine("UI1: UI refreshing with value "+obj);
        }
    }

This is my first project with Rx so let me know if I should be thinking differently. Any help would be highly appreciated!

James World

I'm going to let you know you should be thinking differently... :) Flippancy aside, this looks like a case of bad collision between object-oriented and functional-reactive styles.

It's not clear what the requirements are around timing of the data flow and caching of results here - the use of Publish and IConnectableObservable is a little confused. I'm going to guess you want to avoid the 2 downstream subscriptions causing the processing of a value being duplicated? I'm basing some of my answer on that premise. The use of Publish() can achieve this by allowing multiple subscribers to share a subscription to a single source.

Idiomatic Rx wants you to try and keep to a functional style. In order to do this, you want to present the long running work as a function. So let's say, instead of trying to wire your AsyncClass logic directly into the Rx chain as a class, you could present it as a function like this contrived example:

async Task<int> ProcessArgument(int argument)
{
    // perform your lengthy calculation - maybe in an OO style,
    // maybe creating class instances and invoking methods etc.
    await Task.Delay(TimeSpan.FromSeconds(1));
    return argument + 1;
}

Now, you can construct a complete Rx observable chain calling this function, and through the use of Publish().RefCount() you can avoid multiple subscribers causing duplicate effort. Note how this separates concerns too - the code processing the value is simpler because the reuse is handled elsewhere.

var query = source.SelectMany(x => ProcessArgument(x).ToObservable())
                  .Publish().RefCount();

By creating a single chain for subscribers, the work is only started when necessary on subscription. I've used Publish().RefCount() - but if you want to ensure values aren't missed by the second and subsequent subscribers, you could use Replay (easy) or use Publish() and then Connect - but you'll want the Connect logic outside the individual subscriber's code because you just need to call it once when all subscribers have subscribed.

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

RXJS AngularFire-从Observable获取Observables

来自分类Dev

可无限滚动的RxJs Observable或如何组合Observables

来自分类Dev

rxjs observables生命周期:检测完成的observable

来自分类Dev

RxJava-合并的Observable可以随时接受更多Observables吗?

来自分类Dev

Dependent Sliders with Bokeh, how to write the callbacks

来自分类Dev

How do I combine three observables such that

来自分类Dev

How to manage large VHDL testbenches

来自分类Dev

How to manage GET Method in an API

来自分类Dev

How to manage a mutex in an asynchronous method

来自分类Dev

What is the functional way to properly set a dependent predicate for Observable sequence without side effect?

来自分类Dev

两个 observables 的 Observable.race 是否不止一次工作?

来自分类Dev

Observables在扩展器中初始化/附加到Observable,未在页面加载时初始化

来自分类Dev

Delphi - How Indy TCPServer manage clients connections?

来自分类Dev

How to manage multiple positive implicit feedbacks?

来自分类Dev

类型'Subscription'缺少类型'Observable <any>'中的以下属性:_isScalar,source,operator,lift和另外6个

来自分类Dev

How to show dependent select boxes options in angular js

来自分类Dev

如何在不使用计算的 observables 的情况下将 observable 数组上的 Knockout foreach 一次限制为 5 个?

来自分类Dev

How did I manage to combine O and c to © in PuTTY?

来自分类Dev

How did I manage to combine O and c to © in PuTTY?

来自分类Dev

How to re-evaluate c++ function bound to qml property on dependent property change?

来自分类Dev

将 observables 组合成 observables 数组

来自分类Dev

RxJava: chaining observables

来自分类Dev

连接rxjs Observables

来自分类Dev

Android Observables中的StickyEvents?

来自分类Dev

RxJava并行获取Observables

来自分类Dev

Angular Observables和Http

来自分类Dev

Observables数组角度rxjs

来自分类Dev

测试结合了Observables的函数

来自分类Dev

Android Observables中的StickyEvents?

Related 相关文章

  1. 1

    RXJS AngularFire-从Observable获取Observables

  2. 2

    可无限滚动的RxJs Observable或如何组合Observables

  3. 3

    rxjs observables生命周期:检测完成的observable

  4. 4

    RxJava-合并的Observable可以随时接受更多Observables吗?

  5. 5

    Dependent Sliders with Bokeh, how to write the callbacks

  6. 6

    How do I combine three observables such that

  7. 7

    How to manage large VHDL testbenches

  8. 8

    How to manage GET Method in an API

  9. 9

    How to manage a mutex in an asynchronous method

  10. 10

    What is the functional way to properly set a dependent predicate for Observable sequence without side effect?

  11. 11

    两个 observables 的 Observable.race 是否不止一次工作?

  12. 12

    Observables在扩展器中初始化/附加到Observable,未在页面加载时初始化

  13. 13

    Delphi - How Indy TCPServer manage clients connections?

  14. 14

    How to manage multiple positive implicit feedbacks?

  15. 15

    类型'Subscription'缺少类型'Observable <any>'中的以下属性:_isScalar,source,operator,lift和另外6个

  16. 16

    How to show dependent select boxes options in angular js

  17. 17

    如何在不使用计算的 observables 的情况下将 observable 数组上的 Knockout foreach 一次限制为 5 个?

  18. 18

    How did I manage to combine O and c to © in PuTTY?

  19. 19

    How did I manage to combine O and c to © in PuTTY?

  20. 20

    How to re-evaluate c++ function bound to qml property on dependent property change?

  21. 21

    将 observables 组合成 observables 数组

  22. 22

    RxJava: chaining observables

  23. 23

    连接rxjs Observables

  24. 24

    Android Observables中的StickyEvents?

  25. 25

    RxJava并行获取Observables

  26. 26

    Angular Observables和Http

  27. 27

    Observables数组角度rxjs

  28. 28

    测试结合了Observables的函数

  29. 29

    Android Observables中的StickyEvents?

热门标签

归档