如果将IEnumerable与async / await一起使用(从带有Dapper的SQL Server中流式传输数据)一起使用,会发生什么情况?

伊利亚·切尔诺莫迪克(Ilya Chernomordik)

我正在使用Dapper从SQL Server中的一个非常大的数据集中流式传输数据。返回IEnumerable和调用可以正常工作Query(),但是当我切换到时QueryAsync(),该程序似乎尝试从SQL Server而非流式读取所有数据。

根据这个问题,它可以正常工作buffered: false,我正在做,但是这个问题什么也没说async/await

现在根据这个问题,要完成我想做的事情并不容易QueryAsync()

我是否正确理解在切换上下文时会枚举可枚举的对象async/await

另一个问题是,当新的C#8异步流可用时,是否可以做到这一点?

帕纳吉奥提斯·卡纳沃斯(Panagiotis Kanavos)

2020年3月更新

.NET Core 3.0(和3.1)现已推出,完全支持异步流。Microsoft.Bcl.AsyncInterfaces增加了对他们的支持,.NET 2.0标准和.NET Framework 4.6.1+,虽然4.7.2应当用于理智的原因。正如.NET Standard实施支持文档中所述

虽然NuGet认为.NET Framework 4.6.1支持.NET Standard 1.5至2.0,但使用为.NET Framework 4.6.1项目中的那些版本构建的.NET Standard库存在一些问题。

对于需要使用此类库的.NET Framework项目,建议您将项目升级为目标.NET Framework 4.7.2或更高版本。

原始答案

如果查看源代码,您会发现您的怀疑几乎是正确的。buffered为false时,QueryAsync同步

if (command.Buffered)
{
    var buffer = new List<T>();
    var convertToType = Nullable.GetUnderlyingType(effectiveType) ?? effectiveType;
    while (await reader.ReadAsync(cancel).ConfigureAwait(false))
    {
        object val = func(reader);
        if (val == null || val is T)
        {
            buffer.Add((T)val);
        }
        else
        {
            buffer.Add((T)Convert.ChangeType(val, convertToType, CultureInfo.InvariantCulture));
        }
    }
    while (await reader.NextResultAsync(cancel).ConfigureAwait(false)) { /* ignore subsequent result sets */ }
    command.OnCompleted();
    return buffer;
}
else
{
    // can't use ReadAsync / cancellation; but this will have to do
    wasClosed = false; // don't close if handing back an open reader; rely on the command-behavior
    var deferred = ExecuteReaderSync<T>(reader, func, command.Parameters);
    reader = null; // to prevent it being disposed before the caller gets to see it
    return deferred;
}

正如评论所解释的,ReadAsync当期望返回类型为IEnumerable时,将无法使用这就是为什么必须引入C#8的异步枚举的原因。

ExecuteReaderSync的代码是:

private static IEnumerable<T> ExecuteReaderSync<T>(IDataReader reader, Func<IDataReader, object> func, object parameters)
{
    using (reader)
    {
        while (reader.Read())
        {
            yield return (T)func(reader);
        }
        while (reader.NextResult()) { /* ignore subsequent result sets */ }
        (parameters as IParameterCallbacks)?.OnCompleted();
    }
}

它使用Read代替ReadAsync

C#8异步流将允许对其进行重写以返回IAsyncEnumerable仅仅更改语言版本并不能解决问题。

鉴于异步流上的当前文档,这可能看起来像:

private static async IAsyncEnumerable<T> ExecuteReaderASync<T>(IDataReader reader, Func<IDataReader, object> func, object parameters)
{
    using (reader)
    {
        while (await reader.ReadAsync())
        {
            yield return (T)func(reader);
        }

        while (await reader.NextResultAsync(cancel).ConfigureAwait(false)) { /* ignore subsequent result sets */ }
         command.OnCompleted();
        (parameters as IParameterCallbacks)?.OnCompleted();
    }
}

Buuuuuut异步流是只能在.NET Core上运行的功能之一,并且可能尚未实现。当我尝试在Sharplab.io中写一个时,是Kaboom。[connection lost, reconnecting…]

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

Related 相关文章

热门标签

归档