我正在尝试使用
Parallel.ForEachAsync
执行文件上传,它可以工作,但会丢失排序顺序。有什么方法可以同步排序顺序或源列表和目标列表吗?
await Parallel.ForEachAsync(model.DestinationFiles,
new ParallelOptions { MaxDegreeOfParallelism = 20 }, async (file, CancellationToken) =>
{
var storeAsync = await _fileServerService.Init(displayUrl).StoreAsync(file.FileInfo, false, file.OutputFileName);
convertResultDto.Files.Add(new ConverterConvertResultFile(storeAsync));
});
之前我使用过 Linq 并行运算符 (PLINQ),它具有
AsOrdered
运算符来处理排序。无论如何,我认为 Parallel.ForEachAsync
更适合在具有 I/O 场景的异步方法中使用?
var storeFiles = model.DestinationFiles.AsParallel().AsOrdered().WithDegreeOfParallelism(50)
.Select(file => StoreAsync(file.FileInfo, false, file.OutputFileName).GetAwaiter().GetResult())
.Select(storeFile => new StoreFile
{
FileId = storeFile.FileId,
Url = storeFile.Url,
OutputFileName = storeFile.OutputFileName,
Size = storeFile.Size
});
在这种情况下,您想要获取一组结果并将它们存储在结果集合中。
Parallel
专为更多操作而设计,无结果。对于有结果的操作,您可以使用 PLINQ 进行 CPU 密集型操作,或使用异步并发进行 I/O 密集型操作。不幸的是,没有 Parallel.ForEachAsync
的 PLINQ 等效项,这将是与您当前代码最接近的等效项。
异步并发使用
Task.WhenAll
来获取多个异步操作的结果。它还可以使用SemaphoreSlim
进行节流。像这样的东西:
var mutex = new SemaphoreSlim(20);
var results = await Task.WhenAll(model.DestinationFiles.Select(async file =>
{
await mutex.WaitAsync();
try
{
var storeAsync = await _fileServerService.Init(displayUrl).StoreAsync(file.FileInfo, false, file.OutputFileName);
return new ConverterConvertResultFile(storeAsync);
}
finally { mutex.Release(); }
});
convertResultDto.Files.AddRange(results);
但是,如果您混合执行 CPU 密集型和 I/O 密集型操作,那么您可能会想继续使用
ForEachAsync
。在这种情况下,您可以首先在目标集合中创建条目,然后使用索引执行每个操作,以便它知道将它们存储在哪里:
// This code assumes convertResultDto.Files is empty at this point.
var count = model.DestinationFiles.Count;
convertResultDto.Files.AddRange(Enumerable.Repeat<ConverterConvertResultFile>(null!, count));
await Parallel.ForEachAsync(
model.DestinationFiles.Select((file, i) => (file, i)),
new ParallelOptions { MaxDegreeOfParallelism = 20 },
async item =>
{
var (file, i) = item;
var storeAsync = await _fileServerService.Init(displayUrl).StoreAsync(file.FileInfo, false, file.OutputFileName);
convertResultDto.Files[i] = new ConverterConvertResultFile(storeAsync);
});
尝试按 OutputFileName 顺序进行处理: 等待 Parallel.ForEachAsync(model.DestinationFiles.OrderBy(f => f.OutputFileName),