How to yield return item when doing Task.WhenAny

John

I have two projects in my solution: WPF project and class library.

In my class library:

I have a List of Symbol:

class Symbol
{
     Identifier Identifier {get;set;}
     List<Quote> HistoricalQuotes {get;set;}
     List<Financial> HistoricalFinancials {get;set;}
}

For each symbol, I query a financial service to retrieve historical financial data for each one of my symbols using a webrequest. (webClient.DownloadStringTaskAsync(uri);)

So here's my method which do that:

    public async Task<IEnumerable<Symbol>> GetSymbolsAsync()
    {
        var historicalFinancialTask = new List<Task<HistoricalFinancialResult>>();

        foreach (var symbol in await _listSymbols)
        {
            historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol));
        }

        while (historicalFinancialTask.Count > 0)
        {
            var historicalFinancial = await Task.WhenAny(historicalFinancialTask);
            historicalFinancialTask.Remove(historicalFinancial);

            // the line below doesn't compile, which is understandable because method's return type is a Task of something
            yield return new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data); 
        }
    }

    private async Task<HistoricalFinancialResult> GetFinancialsQueryAsync(Symbol symbol)
    {
        var result = new HistoricalFinancialResult();
        result.Symbol = symbol;
        result.Data = await _financialsQuery.GetFinancialsQuery(symbol.Identifier); // contains some logic like parsing and use WebClient to query asynchronously
        return result;
    }

    private class HistoricalFinancialResult
    {
        public Symbol Symbol { get; set; }
        public IEnumerable<Financial> Data { get; set; }

        // equality members
    }

As you can see, I want that each time I download a Financial historical data per symbol, to yield the result instead of waiting for all my calls to financial service to complete.

And in my WPF, here's what I would like to do:

foreach(var symbol in await _service.GetSymbolsAsync())
{
      SymbolsObservableCollection.Add(symbol);
}

It seems we can't yield return in an async method, then what solution can I use? Except moving my GetSymbols method into my WPF project.

Ian Griffiths

While I like the TPL Dataflow components (which svick suggests you use), moving over to that system does require a substantial commitment - it's not something you can just add to an existing design. It offers considerable benefits if you're performing high volumes of CPU-intensive data processing and want to exploit many CPU cores. But getting the best out of it is non-trivial.

His other suggestion, using Rx, might be easier to integrate with an existing solution. (See the original documentation, but for the latest code, use the Rx-Main nuget package. Or if you'd like to look at the source, see the Rx CodePlex site) It would even be possible for the calling code to carry on using an IEnumerable<Symbol> if you want - you can use Rx purely as an implementation detail, [edit 2013/11/09 to add:] although as svick has pointed out, that's probably not a good idea, given your end goal.

Before I show you an example, I want to be clear about what exactly we're doing. Your example had a method with this signature:

public async Task<IEnumerable<Symbol>> GetSymbolsAsync()

That return type, Task<IEnumerable<Symbol>>, essentially says "This is a method that produces a single result of type IEnumerable<Symbol>, and it may not produce that result immediately."

It's that single result bit that I think is causing you grief, because that's not really what you want. A Task<T> (no matter what T may be) represents a single asynchronous operation. It may have many steps (many uses of await if you implement it as a C# async method) but ultimately it produces one thing. You want to produce multiple things, at different, times, so Task<T> is not a good fit.

If you were really going to do what your method signature promises - producing one result eventually - one way you could do this is to have your async method build a list and then produce that as the result when it's good and ready:

// Note: this first example is *not* what you want.
// However, it is what your method's signature promises to do.
public async Task<IEnumerable<Symbol>> GetSymbolsAsync()
{
    var historicalFinancialTask = new List<Task<HistoricalFinancialResult>>();

    foreach (var symbol in await _listSymbols)
    {
        historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol));
    }

    var results = new List<Symbol>();
    while (historicalFinancialTask.Count > 0)
    {
        var historicalFinancial = await Task.WhenAny(historicalFinancialTask);
        historicalFinancialTask.Remove(historicalFinancial);

        results.Add(new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data)); 
    }

    return results;
}

This method does what its signature says: it asynchronously produces a sequence of symbols.

But presumably you'd like to create an IEnumerable<Symbol> that produces the items as they become available, rather than waiting until they're all available. (Otherwise, you might as well just use WhenAll.) You can do that, but yield return is not the way.

In short, what I think you want to do is produce an asynchronous list. There's a type for that: IObservable<T> expresses exactly what I believe you were hoping to express with your Task<IEnumerable<Symbol>>: it's a sequence of items (just like IEnumerable<T>) but asynchronous.

It may help to understand it by analogy:

public Symbol GetSymbol() ...

is to

public Task<Symbol> GetSymbolAsync() ...

as

public IEnumerable<Symbol> GetSymbols() ...

is to:

public IObservable<Symbol> GetSymbolsObservable() ...

(Unfortunately, unlike with Task<T> there isn't a common naming convention for what to call an asynchronous sequence-oriented method. I've added 'Observable' on the end here, but that's not universal practice. I certainly wouldn't call it GetSymbolsAsync because people will expect that to return a Task.)

To put it another way, Task<IEnumerable<T>> says "I'll produce this collection when I'm good and ready" whereas IObservable<T> says: "Here's a collection. I'll produce each item when I'm good and ready."

So, you want a method that returns a sequence of Symbol objects, where those objects are produced asynchronously. That tells us that you should really be returning an IObservable<Symbol>. Here's an implementation:

// Unlike this first example, this *is* what you want.
public IObservable<Symbol> GetSymbolsRx()
{
    return Observable.Create<Symbol>(async obs =>
    {
        var historicalFinancialTask = new List<Task<HistoricalFinancialResult>>();

        foreach (var symbol in await _listSymbols)
        {
            historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol));
        }

        while (historicalFinancialTask.Count > 0)
        {
            var historicalFinancial = await Task.WhenAny(historicalFinancialTask);
            historicalFinancialTask.Remove(historicalFinancial);

            obs.OnNext(new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data));
        }
    });
}

As you can see, this lets you write pretty much what you were hoping to write - the body of this code is almost identical to yours. The only difference is that where you were using yield return (which didn't compile), this calls the OnNext method on an object supplied by Rx.

Having written that, you can easily wrap this in an IEnumerable<Symbol> ([Edited 2013/11/29 to add:] although you probably don't actually want to do this - see addition at end of answer):

public IEnumerable<Symbol> GetSymbols()
{
    return GetSymbolsRx().ToEnumerable();
}

This may not look asynchronous, but it does in fact allow the underlying code to operate asynchronously. When you call this method, it will not block - even if the underlying code that does the work of fetching the financial information cannot produce a result immediately, this method will nonetheless immediately return an IEnumerable<Symbol>. Now of course, any code that attempts to iterate through that collection will end up blocking if data is not yet available. But the critical thing is that does what I think you were originally trying to achieve:

  • You get to write an async method that does the work (a delegate in my example, passed as an argument to Observable.Create<T> but you could write a standalone async method if you prefer)
  • The calling code will not be blocked merely as result of asking you to start fetching the symbols
  • The resulting IEnumerable<Symbol> will produce each individual item as soon as it becomes available

This works because Rx's ToEnumerable method has some clever code in it that bridges the gap between the synchronous world view of IEnumerable<T> and asynchronous production of results. (In other words, this does exactly what you were disappointed to discover C# wasn't able to do for you.)

If you're curious, you can look at the source. The code that underlies what ToEnumerable does can be found at https://rx.codeplex.com/SourceControl/latest#Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GetEnumerator.cs

[Edited 2013/11/29 to add:]

svick has pointed out in the comments something I missed: your final goal is to put the contents into an ObservableCollection<Symbol>. Somehow I didn't see that bit. That means IEnumerable<T> is the wrong way to go - you want to populate the collection as items become available, rather than doing through with a foreach loop. So you'd just do this:

GetSymbolsRx().Subscribe(symbol => SymbolsObservableCollection.Add(symbol));

or something along those lines. That will add items to the collection as and when they become available.

This depends on the whole thing being kicked off on the UI thread by the way. As long as it is, your async code should end up running on the UI thread, meaning that when items are added to the collection, that also happens on the UI thread. But if for some reason you end up launching things from a worker thread (or if you were to use ConfigureAwait on any of the awaits, thus breaking the connection with the UI thread) you'd need to arrange to handle the items from the Rx stream on the right thread:

GetSymbolsRx()
    .ObserveOnDispatcher()
    .Subscribe(symbol => SymbolsObservableCollection.Add(symbol));

If you're on the UI thread when you do that, it'll pick up the current dispatcher, and ensure all notifications arrive through it. If you're already on the wrong thread when you come to subscribe, you can use the ObserveOn overload that takes a dispatcher. (These require you to have a reference to System.Reactive.Windows.Threading. And these are extension methods, so you'll need a using for their containing namespace, which is also called System.Reactive.Windows.Threading)

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

How to get effect of Task.WhenAny for a Task and CancellationToken?

分類Dev

how to yield a parsed item from one link with other parsed items from other links in the same item list

分類Dev

How to return Task output from an anonymous method?

分類Dev

Task.WhenAny for a Task and CancellationTokenの効果を得る方法は?

分類Dev

How do you return the item found by enumerateObjectsUsingBlock?

分類Dev

IEnumerable yield return function with excel

分類Dev

How to break when found the item?

分類Dev

How to add a new Activity type to the Task work item in TFS 2018

分類Dev

In ansible how do print stderr of each item which fails in a task?

分類Dev

How to avoid using a var when doing multiple string replacements in a string

分類Dev

How to fetch a result from a triggered Action in order to yield return it in the containing method?

分類Dev

Task.WhenAny ContinueWith:引数を取得しますか?

分類Dev

辞書を使用したTask.WhenAnyの使用

分類Dev

await Task.WhenAny()を使用して例外を無視する

分類Dev

Task.WhenAny(障害のないタスクの場合)

分類Dev

Task.Yield(); SyncAction(); vs Task.Run(()=> SyncAction());

分類Dev

How to sort LiveData from Room Database? Doing a button to switch recyclerview's item's order

分類Dev

How do I spawn a task that will run to completion and immediately return to the client?

分類Dev

How to reuse yield on spider

分類Dev

Unity - Everything freezes on " yield return new WaitForSeconds(); "?

分類Dev

How to get the name of a QMenu Item when clicked?

分類Dev

How to set background colour when click on an item?

分類Dev

How to return `false` if item you want to delete isn't in list

分類Dev

Task.WhenAny-タスクがキャンセルされました

分類Dev

Detecting Outliers When Doing PCA

分類Dev

How to use yield function in python

分類Dev

How does this yield work in this generator?

分類Dev

How to use yield in Typescript classes

分類Dev

ComboBox item return index

Related 関連記事

  1. 1

    How to get effect of Task.WhenAny for a Task and CancellationToken?

  2. 2

    how to yield a parsed item from one link with other parsed items from other links in the same item list

  3. 3

    How to return Task output from an anonymous method?

  4. 4

    Task.WhenAny for a Task and CancellationTokenの効果を得る方法は?

  5. 5

    How do you return the item found by enumerateObjectsUsingBlock?

  6. 6

    IEnumerable yield return function with excel

  7. 7

    How to break when found the item?

  8. 8

    How to add a new Activity type to the Task work item in TFS 2018

  9. 9

    In ansible how do print stderr of each item which fails in a task?

  10. 10

    How to avoid using a var when doing multiple string replacements in a string

  11. 11

    How to fetch a result from a triggered Action in order to yield return it in the containing method?

  12. 12

    Task.WhenAny ContinueWith:引数を取得しますか?

  13. 13

    辞書を使用したTask.WhenAnyの使用

  14. 14

    await Task.WhenAny()を使用して例外を無視する

  15. 15

    Task.WhenAny(障害のないタスクの場合)

  16. 16

    Task.Yield(); SyncAction(); vs Task.Run(()=> SyncAction());

  17. 17

    How to sort LiveData from Room Database? Doing a button to switch recyclerview's item's order

  18. 18

    How do I spawn a task that will run to completion and immediately return to the client?

  19. 19

    How to reuse yield on spider

  20. 20

    Unity - Everything freezes on " yield return new WaitForSeconds(); "?

  21. 21

    How to get the name of a QMenu Item when clicked?

  22. 22

    How to set background colour when click on an item?

  23. 23

    How to return `false` if item you want to delete isn't in list

  24. 24

    Task.WhenAny-タスクがキャンセルされました

  25. 25

    Detecting Outliers When Doing PCA

  26. 26

    How to use yield function in python

  27. 27

    How does this yield work in this generator?

  28. 28

    How to use yield in Typescript classes

  29. 29

    ComboBox item return index

ホットタグ

アーカイブ