Error Task[] tasks converison IEnumerable
//await Task.WhenAll(WithMaxConcurrency(tasks,3)); // getting error at tasks conversion
var tasks = new Func<Task>[]
{
() => jack(),
() => roy(),
() => sam()
};
await Task.WhenAll(WithMaxConcurrency(tasks,3)); // getting error at tasks conversion
private static IEnumerable<Task<T>> WithMaxConcurrency<T>(IEnumerable<Task<T>> tasks, int maxParallelism)
{
SemaphoreSlim maxOperations = new SemaphoreSlim(maxParallelism);
// The original tasks get wrapped in a new task that must first await a semaphore before the original task is called.
return tasks.Select(task => maxOperations.WaitAsync().ContinueWith(_ =>
{
try { return task; }
finally { maxOperations.Release(); }
}).Unwrap());
}
想要Task[]任务进入IEnumerable
问题似乎是如何限制任务的执行。目前尚不清楚这些任务的作用。 .NET 具有高级功能,允许在管道中并发处理大量数据。一种选择是使用
Parallel.ForEachAsync
使用特定数量的工作人员处理数据流
使用 Parallel.ForEachAsync
这个例子展示了如何检索 Github 用户 bios,一次 3 个:
var userHandlers = new []
{
"users/okyrylchuk",
"users/shanselman",
"users/jaredpar",
"users/davidfowl"
};
ParallelOptions parallelOptions = new()
{
MaxDegreeOfParallelism = 3
};
await Parallel.ForEachAsync(userHandlers, parallelOptions, async (uri, token) =>
{
var user = await client.GetFromJsonAsync<GitHubUser>(uri, token);
Console.WriteLine($"Name: {user.Name}\nBio: {user.Bio}\n");
});
这也可用于处理一组
Func<Task>
调用,充当作业队列。不过,这不如处理输入流有用:
var funcs= new Func<Task>[]
{
() => jack(),
() => roy(),
() => sam()
};
ParallelOptions parallelOptions = new()
{
MaxDegreeOfParallelism = 3
};
await Parallel.ForEachAsync(funcs, parallelOptions, async (func, _) =>
{
await func();
});
Parallel.ForEachAsync
不返回任何结果。代表必须将它们存储在 ConcurrentQueue 中:
var results=ConcurrentQueue<Whatever>();
await Parallel.ForEachAsync(funcs, parallelOptions, async (func, _) =>
{
var result=await func();
results.Enqueue(result);
});
使用数据流块
另一种选择是使用数据流块,例如 ActionBlock 来处理具有固定 DOP 的数据/调用:
var dop=new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 3
};
var downloader=new ActionBlock<string>(async uri=>{
var user = await client.GetFromJsonAsync<GitHubUser>(uri);
Console.WriteLine($"Name: {user.Name}\nBio: {user.Bio}\n");
},dop);
foreach(var uri in userHandlers)
{
await downloader.SendAsync(uri);
}
downloader.Complete();
await downloader.Completion;
块可以返回结果并组合成处理步骤的管道:
var downloader=new TransformBlock<Uri,FileInfo>(DownloadToCsv,dop);
var importer=new ActionBlock<FileInfo>(Importer);
downloader.LinkTo(importer,new DataflowLinkOptions {PropagateCompletion=true});
foreach(var uri in fileUris)
{
await downloader.SendAsync(uri);
}
downloader.Complete();
await importer.Completion;
在这种情况下,
downloader
块一次检索3个文件并将FileInfo
对象发送到Importer
块。该块一次将一个文件导入数据库。所有块同时工作,同时下载和导入数据。
当所有文件都被请求时,
downloader.Complete()
告诉 head 块我们完成了。之后,我们等待尾块处理所有待处理的工作 await importer.Completion;
数据流块也可以用作异步工作队列,但这不如正确使用它们灵活:
var queue=new TransformBlock<Func<Task<Whatever>>,Whatever>(func=>func(),dop);
var buffer=new BufferBlock<Whatever>();
queue.LinkTo(buffer);
foreach(var func in funcs)
{
await queue.SendAsync(func);
}
queue.Complete();
await queue.Completion;
if (buffer.TryReceiveAll(out var results))
{
//Use the results
}