基本上我有一个类似的程序
var results = await Task.WhenAll(
from input in inputs
select Task.Run(async () => await InnerMethodAsync(input))
);
.
.
.
private static async Task<Output> InnerMethodAsync(Input input)
{
var x = await Foo(input);
var y = await Bar(x);
var z = await Baz(y);
return z;
}
我想知道是否有一种奇特的方法可以将其组合到单个 LINQ 查询中,就像“异步流”(我可以描述它的最佳方式)。
当您使用 LINQ 时,通常有两个部分:创建和迭代。
创作:
var query = list.Select( a => a.Name);
这些调用始终是同步的。但这段代码除了创建一个公开 IEnumerable 的对象之外,并没有做更多的事情。由于称为“延迟执行”的模式,实际工作要等到稍后才能完成。 迭代:
var results = query.ToList();
此代码采用可枚举值并获取每个项目的值,这通常会涉及回调委托的调用(在本例中为
a => a.Name
)。这是潜在昂贵的部分,并且可以从异步性中受益,例如如果你的回调类似于
async a => await httpClient.GetByteArrayAsync(a)
.因此,如果我们想使其异步,那么我们感兴趣的是迭代部分。
这里的问题是
ToList()
(以及大多数强制迭代的其他方法,如
Any()
或 Last()
)不是异步方法,因此您的回调委托将被同步调用,您最终会得到任务列表而不是您想要的数据。我们可以用这样的一段代码来解决这个问题:
public static class ExtensionMethods
{
static public async Task<List<T>> ToListAsync<T>(this IEnumerable<Task<T>> This)
{
var tasks = This.ToList(); //Force LINQ to iterate and create all the tasks. Tasks always start when created.
var results = new List<T>(); //Create a list to hold the results (not the tasks)
foreach (var item in tasks)
{
results.Add(await item); //Await the result for each task and add to results list
}
return results;
}
}
通过这个扩展方法,我们可以重写你的代码:
var results = await inputs.Select( async i => await InnerMethodAsync(i) ).ToListAsync();
^这应该为您提供您正在寻找的异步行为,并避免创建线程池任务,就像您的示例一样。
注意:如果您使用 LINQ-to-entities,则昂贵的部分(数据检索)不会暴露给您。对于 LINQ 到实体,您需要使用 EF 框架附带的
ToListAsync()DotNetFiddle只是一起使用了LINQ
和
async
- 你正在使用LINQ的select
来投影和启动一堆异步任务,然后在结果上await
,它提供了异步并行模式。虽然您可能只是提供了一个示例,但您的代码中有一些需要注意的事项(我已切换到 Lambda 语法,但适用相同的原则)
由于在第一个
await
var x = await Foo(input);
之前没有完成任何工作),因此这里没有 没有真正的理由使用
Task.Run
。 并且由于在调用 InnerMethodAsync
InnerMethodAsync
调用包装在 async lambda中(但要警惕
IDisposable
)
Task
返回的
InnerMethodAsync
并使用 Task.WhenAll
等待它们。var tasks = inputs
.Select(input => InnerMethodAsync(input)) // or just .Select(InnerMethodAsync);
var results = await Task.WhenAll(tasks);
使用异步和 Linq 可以实现更复杂的模式,但您应该看看Reactive Extensions 和 TPL Data Flow Library
IObservable<Output[]> query =
from input in inputs.ToObservable()
from x in Observable.FromAsync(() => Foo(input))
from y in Observable.FromAsync(() => Bar(x))
from z in Observable.FromAsync(() => Baz(y))
select z;
Output[] results = await query.ToArray();
简单。
只需 NuGet“System.Reactive”并将
using System.Reactive.Linq;
添加到您的代码中。
var data =await (linq query).ToListAsync();