This sample console application has 2 observables. The first one pushes numbers from 1 to 100. This observable is subscribed by the AsyncClass
which runs a long running process for each number it gets. Upon completion of this new async process I want to be able to 'push' to 2 subscribers which would be doing something with this new value.
My attempts are commented in the source code below.
AsyncClass:
class AsyncClass
{
private readonly IConnectableObservable<int> _source;
private readonly IDisposable _sourceDisposeObj;
public IObservable<string> _asyncOpObservable;
public AsyncClass(IConnectableObservable<int> source)
{
_source = source;
_sourceDisposeObj = _source.Subscribe(
ProcessArguments,
ExceptionHandler,
Completed
);
_source.Connect();
}
private void Completed()
{
Console.WriteLine("Completed");
Console.ReadKey();
}
private void ExceptionHandler(Exception exp)
{
throw exp;
}
private void ProcessArguments(int evtArgs)
{
Console.WriteLine("Argument being processed with value: " + evtArgs);
//_asyncOpObservable = LongRunningOperationAsync("hello").Publish();
// not going to work either since this creates a new observable for each value from main observer
}
// http://rxwiki.wikidot.com/101samples
public IObservable<string> LongRunningOperationAsync(string param)
{
// should not be creating an observable here, rather 'pushing' values?
return Observable.Create<string>(
o => Observable.ToAsync<string, string>(DoLongRunningOperation)(param).Subscribe(o)
);
}
private string DoLongRunningOperation(string arg)
{
return "Hello";
}
}
Main:
static void Main(string[] args)
{
var source = Observable
.Range(1, 100)
.Publish();
var asyncObj = new AsyncClass(source);
var _asyncTaskSource = asyncObj._asyncOpObservable;
var ui1 = new UI1(_asyncTaskSource);
var ui2 = new UI2(_asyncTaskSource);
}
UI1 (and UI2, they're basically the same):
class UI1
{
private IConnectableObservable<string> _asyncTaskSource;
private IDisposable _taskSourceDisposable;
public UI1(IConnectableObservable<string> asyncTaskSource)
{
_asyncTaskSource = asyncTaskSource;
_asyncTaskSource.Connect();
_taskSourceDisposable = _asyncTaskSource.Subscribe(RefreshUI, HandleException, Completed);
}
private void Completed()
{
Console.WriteLine("UI1: Stream completed");
}
private void HandleException(Exception obj)
{
Console.WriteLine("Exception! "+obj.Message);
}
private void RefreshUI(string obj)
{
Console.WriteLine("UI1: UI refreshing with value "+obj);
}
}
This is my first project with Rx so let me know if I should be thinking differently. Any help would be highly appreciated!
I'm going to let you know you should be thinking differently... :) Flippancy aside, this looks like a case of bad collision between object-oriented and functional-reactive styles.
It's not clear what the requirements are around timing of the data flow and caching of results here - the use of Publish
and IConnectableObservable
is a little confused. I'm going to guess you want to avoid the 2 downstream subscriptions causing the processing of a value being duplicated? I'm basing some of my answer on that premise. The use of Publish()
can achieve this by allowing multiple subscribers to share a subscription to a single source.
Idiomatic Rx wants you to try and keep to a functional style. In order to do this, you want to present the long running work as a function. So let's say, instead of trying to wire your AsyncClass logic directly into the Rx chain as a class, you could present it as a function like this contrived example:
async Task<int> ProcessArgument(int argument)
{
// perform your lengthy calculation - maybe in an OO style,
// maybe creating class instances and invoking methods etc.
await Task.Delay(TimeSpan.FromSeconds(1));
return argument + 1;
}
Now, you can construct a complete Rx observable chain calling this function, and through the use of Publish().RefCount()
you can avoid multiple subscribers causing duplicate effort. Note how this separates concerns too - the code processing the value is simpler because the reuse is handled elsewhere.
var query = source.SelectMany(x => ProcessArgument(x).ToObservable())
.Publish().RefCount();
By creating a single chain for subscribers, the work is only started when necessary on subscription. I've used Publish().RefCount()
- but if you want to ensure values aren't missed by the second and subsequent subscribers, you could use Replay
(easy) or use Publish()
and then Connect
- but you'll want the Connect
logic outside the individual subscriber's code because you just need to call it once when all subscribers have subscribed.
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句