如何将.NET Framework中的Parallel.ForEach移植到异步IO?

问题描述 投票:0回答:1

我需要处理一个大存储库(源存储库)并在另一个存储库(元数据存储库)中创建/更新(或保持完整)大约 2M 个文件。

我当前的实现不使用异步 I/O,尽管我很乐意这样做。有一个复杂的情况,这使我无法找到好的解决方案。

事实上,我的代码是生产者/消费者管道的消费者部分,其中一些代码生成元数据文件的内容,将其放入阻塞集合(我称之为bus),并且我的代码同时从总线消费多线程。

但更重要的是,它包含两个不同的阶段:

  1. 第 1 阶段 - 收集所有现有元数据文件名称的后台任务正在运行。在此阶段:
    • 代码使用
      File.Exists
      检查元数据文件是否存在。
    • 访问过的元数据文件的名称记录在名为
      processedFiles
      的专用集合中。
  2. 第 2 阶段 - 上述后台任务完成后,所有现有元数据文件的名称都被收集到
    existingFiles
    集合中。在此阶段:
    • 代码通过在
      existingFiles
      集合中查找来检查元数据文件是否存在。
    • 已访问元数据文件的名称将从
      existingFiles
      集合中删除。

面临的挑战是实现两个阶段之间的转换,而不会在总线上丢失任何“数据包”。

我能够相对轻松地使用

Parallel.ForEach
进行编码,但现在我想使用异步 I/O,但我陷入了困境,因为 .NET Framework 没有
Parallel.ForEachAsync
并且我必须 使用 .NET Framework。我需要另一种方法。

我想展示我的代码的要点,主要是展示我如何实现阶段之间的转换。

private class EndWorkItemsParallelState
{
    public readonly StringBuilder StringBuilder = new(1000);
    public readonly Guid Guid = Guid.NewGuid();
}

private Task GetEndWorkItemsConsumer(BlockingCollection<EndWorkItem> endWorkItems, int concurrency) => Task.Factory.StartNew(() =>
{
    ConcurrentDictionary<string, object> existingFiles = null;
    ConcurrentDictionary<string, object> processedFiles = new(C.IgnoreCase);

    ConcurrentDictionary<Guid, ManualResetEventSlim> locks = [];
    object modeSwitchGuard = new();

    Func<string, byte[], bool> saveJsonFileIfDirty = (filePath, newContent) => SaveIfDirty1(filePath, newContent, processedFiles);

    Parallel.ForEach(endWorkItems.GetConsumingEnumerable(), new ParallelOptions
    {
        MaxDegreeOfParallelism = concurrency
    }, () =>
    {
        EndWorkItemsParallelState state = new();
        if (existingFiles == null)
        {
            locks[state.Guid] = new ManualResetEventSlim();
        }
        return state;
    }, (endWorkItem, loop, state) =>
    {
        ProcessEndWorkItem(endWorkItem, saveJsonFileIfDirty);

        if (existingFiles == null && m_collectExistingFilesTask.IsCompleted)
        {
            locks[state.Guid].Set();
            lock (modeSwitchGuard)
            {
                if (existingFiles == null)
                {
                    foreach (var @lock in locks.Values)
                    {
                        @lock.Wait();
                    }

                    existingFiles = m_collectExistingFilesTask.Result;
                    foreach (var processedFile in processedFiles.Keys)
                    {
                        existingFiles.TryRemove(processedFile, out _);
                    }
                    processedFiles = null;
                    
                    saveJsonFileIfDirty = (filePath, newContent) => SaveIfDirty2(filePath, newContent, existingFiles);
                }
            }
        }

        return state;
    }, state =>
    {
        if (locks.TryGetValue(state.Guid, out var @lock))
        {
            @lock.Set();
        }
    });

    locks.Values.ForEach(o => o.Dispose());

    DeleteStaleFiles(existingFiles.Keys);

    void ProcessEndWorkItem(EndWorkItem endWorkItem, Func<string, byte[], bool> saveJsonFileIfDirty){ ... }
    static bool SaveIfDirty1(string filePath, byte[] newContent, ConcurrentDictionary<string, object> processedFiles){ ... }
    static bool SaveIfDirty2(string filePath, byte[] newContent, ConcurrentDictionary<string, object> existingFiles){ ... }
}, TaskCreationOptions.LongRunning);

转换在这里实现:

if (existingFiles == null && m_collectExistingFilesTask.IsCompleted)
{
    locks[state.Guid].Set();
    lock (modeSwitchGuard)
    {
        if (existingFiles == null)
        {
            foreach (var @lock in locks.Values)
            {
                @lock.Wait();
            }

            existingFiles = m_collectExistingFilesTask.Result;
            foreach (var processedFile in processedFiles.Keys)
            {
                existingFiles.TryRemove(processedFile, out _);
            }
            processedFiles = null;

            saveJsonFileIfDirty = (filePath, newContent) => SaveIfDirty2(filePath, newContent, existingFiles);
        }
    }
}

当一个线程检测到阶段 1 应该结束时,它会尝试进入临界区(只有一个成功),然后等待所有其他线程命中临界区并在其上阻塞。它知道其他线程何时到达它,因为每个线程都有自己的

ManualResetEventSlim
实例,当它们到达临界区时,它们会发出信号。其中的线程等待所有这些信号。一旦所有设置完成,这意味着所有线程都被阻塞在关键部分,并且可以安全地执行阶段之间的转换。

到目前为止,我未能将此逻辑移植到异步 I/O。也许是因为我的转换逻辑太复杂了。

如有任何建议,我们将不胜感激。

c# multithreading asynchronous producer-consumer .net-4.7.2
1个回答
0
投票

我刚刚与 Chat GPT 免费层进行了一次有趣的讨论。

这确实是一件很了不起的事情。这是已保存聊天的链接 - https://chatgpt.com/share/66e90794-aea4-8004-82d4-7804f7454019

我还没有测试过,但它的答案(稍作修改)是这样的:

private async Task GetEndWorkItemsConsumerAsync(BlockingCollection<EndWorkItem> endWorkItems, int concurrency)
{
    ConcurrentDictionary<string, object> existingFiles = null;
    ConcurrentDictionary<string, object> processedFiles = new(StringComparer.OrdinalIgnoreCase);

    var locks = new TaskCompletionSource<bool>[concurrency];
    for (int i = 0; i < concurrency; ++i)
    {
        locks[i] = new();
    }

    SemaphoreSlim modeSwitchSemaphore = new SemaphoreSlim(1, 1);

    Func<string, byte[], Task<bool>> saveJsonFileIfDirty = async (filePath, newContent) =>
        await SaveIfDirty1Async(filePath, newContent, processedFiles);

    var tasks = Enumerable.Range(0, concurrency).Select(async i =>
    {
        while (await endWorkItems.Reader.WaitToReadAsync())
        {
            while (endWorkItems.Reader.TryRead(out var endWorkItem))
            {
                await ProcessEndWorkItemAsync(endWorkItem, saveJsonFileIfDirty);

                if (existingFiles == null && m_collectExistingFilesTask.IsCompleted)
                {
                    locks[i].TrySetResult(true); // Signal readiness for phase transition

                    // Phase transition management
                    await modeSwitchSemaphore.WaitAsync();
                    try
                    {
                        if (existingFiles == null)
                        {
                            // Ensure all tasks have arrived before transitioning
                            var lockTasks = locks.Values.Select(tcs => tcs.Task);
                            await Task.WhenAll(lockTasks);  // Wait for all threads to synchronize

                            // Perform the actual phase transition
                            existingFiles = m_collectExistingFilesTask.Result;
                            foreach (var processedFile in processedFiles.Keys)
                            {
                                existingFiles.TryRemove(processedFile, out _);
                            }

                            // Update the delegate for Phase 2
                            saveJsonFileIfDirty = async (filePath, newContent) =>
                                await SaveIfDirty2Async(filePath, newContent, existingFiles);

                            processedFiles = null;
                        }
                    }
                    finally
                    {
                        // Release the semaphore to allow other threads to proceed
                        modeSwitchSemaphore.Release();
                    }
                }
            }
        }
    });

    await Task.WhenAll(tasks);

    // Clean up stale files after transition
    if (existingFiles != null)
    {
        await DeleteStaleFilesAsync(existingFiles.Keys);
    }
}

private async Task ProcessEndWorkItemAsync(EndWorkItem endWorkItem, Func<string, byte[], Task<bool>> saveJsonFileIfDirty)
{
    // Process the work item asynchronously
    // ...
}

private async Task<bool> SaveIfDirty1Async(string filePath, byte[] newContent, ConcurrentDictionary<string, object> processedFiles)
{
    // Perform async file operations in Phase 1
    // ...
}

private async Task<bool> SaveIfDirty2Async(string filePath, byte[] newContent, ConcurrentDictionary<string, object> existingFiles)
{
    // Perform async file operations in Phase 2
    // ...
}

private async Task DeleteStaleFilesAsync(IEnumerable<string> filePaths)
{
    // Delete stale files asynchronously
    // ...
}

我认为这很神奇,即使它不起作用,但我目前还不知道。稍后检查。

© www.soinside.com 2019 - 2024. All rights reserved.