有没有办法结合LINQ和异步

问题描述 投票:0回答:4

基本上我有一个类似的程序

var results = await Task.WhenAll(
    from input in inputs
    select Task.Run(async () => await InnerMethodAsync(input))
);
.
.
.
private static async Task<Output> InnerMethodAsync(Input input)
{
    var x = await Foo(input);
    var y = await Bar(x);
    var z = await Baz(y);
    return z;
}

我想知道是否有一种奇特的方法可以将其组合到单个 LINQ 查询中,就像“异步流”(我可以描述它的最佳方式)。

c# .net linq asynchronous async-await
4个回答
9
投票

当您使用 LINQ 时,通常有两个部分:创建和迭代。

创作:

var query = list.Select( a => a.Name);

这些调用始终是同步的。但这段代码除了创建一个公开 IEnumerable 的对象之外,并没有做更多的事情。由于称为“延迟执行”的模式,实际工作要等到稍后才能完成。 迭代:

var results = query.ToList();

此代码采用可枚举值并获取每个项目的值,这通常会涉及回调委托的调用(在本例中为 
a => a.Name

)。这是潜在昂贵的部分,并且可以从异步性中受益,例如如果你的回调类似于

async a => await httpClient.GetByteArrayAsync(a)
.

因此,如果我们想使其异步,那么我们感兴趣的是迭代部分。

这里的问题是

ToList()

(以及大多数强制迭代的其他方法,如

Any()
Last()
)不是异步方法,因此您的回调委托将被同步调用,您最终会得到任务列表而不是您想要的数据。

我们可以用这样的一段代码来解决这个问题:

public static class ExtensionMethods { static public async Task<List<T>> ToListAsync<T>(this IEnumerable<Task<T>> This) { var tasks = This.ToList(); //Force LINQ to iterate and create all the tasks. Tasks always start when created. var results = new List<T>(); //Create a list to hold the results (not the tasks) foreach (var item in tasks) { results.Add(await item); //Await the result for each task and add to results list } return results; } }

通过这个扩展方法,我们可以重写你的代码:

var results = await inputs.Select( async i => await InnerMethodAsync(i) ).ToListAsync();

^这应该为您提供您正在寻找的异步行为,并避免创建线程池任务,就像您的示例一样。

注意:如果您使用 LINQ-to-entities,则昂贵的部分(数据检索)不会暴露给您。对于 LINQ 到实体,您需要使用 EF 框架附带的

ToListAsync()

尝试一下并查看我在

DotNetFiddle

上的演示中的计时。


3
投票

只是一起使用了LINQ

async
- 你正在使用LINQ的
select
来投影和启动一堆异步任务,然后在结果上
await
,它提供了异步并行模式。

虽然您可能只是提供了一个示例,但您的代码中有一些需要注意的事项(我已切换到 Lambda 语法,但适用相同的原则)

由于在第一个
    await
  • 之前每个任务的 CPU 绑定工作基本上为零(即在
    var x = await Foo(input);
    之前没有完成任何工作),因此这里没有
    没有真正的理由
    使用 Task.Run
    并且由于在调用 
  • InnerMethodAsync
  • 之后 lambda 中没有任何工作要做,因此您不需要将
    InnerMethodAsync
    调用包装在
    async lambda
    中(但要警惕 IDisposable
    
    
    
  • 即您只需选择从
Task

返回的

InnerMethodAsync
并使用
Task.WhenAll
等待它们。

var tasks = inputs .Select(input => InnerMethodAsync(input)) // or just .Select(InnerMethodAsync); var results = await Task.WhenAll(tasks);

使用异步和 Linq 可以实现更复杂的模式,但您应该看看 
Reactive Extensions 和 TPL Data Flow Library

,而不是重新发明轮子,它们具有许多用于复杂流程的构建块。


2
投票

IObservable<Output[]> query = from input in inputs.ToObservable() from x in Observable.FromAsync(() => Foo(input)) from y in Observable.FromAsync(() => Bar(x)) from z in Observable.FromAsync(() => Baz(y)) select z; Output[] results = await query.ToArray();

简单。

只需 NuGet“System.Reactive”并将

using System.Reactive.Linq;

添加到您的代码中。

    


0
投票

var data =await (linq query).ToListAsync();

	
© www.soinside.com 2019 - 2024. All rights reserved.