我需要处理一个大存储库(源存储库)并在另一个存储库(元数据存储库)中创建/更新(或保持完整)大约 2M 个文件。
我当前的实现不使用异步 I/O,尽管我很乐意这样做。有一个复杂的情况,这使我无法找到好的解决方案。
事实上,我的代码是生产者/消费者管道的消费者部分,其中一些代码生成元数据文件的内容,将其放入阻塞集合(我称之为bus),并且我的代码同时从总线消费多线程。
但更重要的是,它包含两个不同的阶段:
File.Exists
检查元数据文件是否存在。processedFiles
的专用集合中。existingFiles
集合中。在此阶段:
existingFiles
集合中查找来检查元数据文件是否存在。existingFiles
集合中删除。面临的挑战是实现两个阶段之间的转换,而不会在总线上丢失任何“数据包”。
我能够相对轻松地使用
Parallel.ForEach
进行编码,但现在我想使用异步 I/O,但我陷入了困境,因为 .NET Framework 没有 Parallel.ForEachAsync
并且我必须 使用 .NET Framework。我需要另一种方法。
我想展示我的代码的要点,主要是展示我如何实现阶段之间的转换。
private class EndWorkItemsParallelState
{
public readonly StringBuilder StringBuilder = new(1000);
public readonly Guid Guid = Guid.NewGuid();
}
private Task GetEndWorkItemsConsumer(BlockingCollection<EndWorkItem> endWorkItems, int concurrency) => Task.Factory.StartNew(() =>
{
ConcurrentDictionary<string, object> existingFiles = null;
ConcurrentDictionary<string, object> processedFiles = new(C.IgnoreCase);
ConcurrentDictionary<Guid, ManualResetEventSlim> locks = [];
object modeSwitchGuard = new();
Func<string, byte[], bool> saveJsonFileIfDirty = (filePath, newContent) => SaveIfDirty1(filePath, newContent, processedFiles);
Parallel.ForEach(endWorkItems.GetConsumingEnumerable(), new ParallelOptions
{
MaxDegreeOfParallelism = concurrency
}, () =>
{
EndWorkItemsParallelState state = new();
if (existingFiles == null)
{
locks[state.Guid] = new ManualResetEventSlim();
}
return state;
}, (endWorkItem, loop, state) =>
{
ProcessEndWorkItem(endWorkItem, saveJsonFileIfDirty);
if (existingFiles == null && m_collectExistingFilesTask.IsCompleted)
{
locks[state.Guid].Set();
lock (modeSwitchGuard)
{
if (existingFiles == null)
{
foreach (var @lock in locks.Values)
{
@lock.Wait();
}
existingFiles = m_collectExistingFilesTask.Result;
foreach (var processedFile in processedFiles.Keys)
{
existingFiles.TryRemove(processedFile, out _);
}
processedFiles = null;
saveJsonFileIfDirty = (filePath, newContent) => SaveIfDirty2(filePath, newContent, existingFiles);
}
}
}
return state;
}, state =>
{
if (locks.TryGetValue(state.Guid, out var @lock))
{
@lock.Set();
}
});
locks.Values.ForEach(o => o.Dispose());
DeleteStaleFiles(existingFiles.Keys);
void ProcessEndWorkItem(EndWorkItem endWorkItem, Func<string, byte[], bool> saveJsonFileIfDirty){ ... }
static bool SaveIfDirty1(string filePath, byte[] newContent, ConcurrentDictionary<string, object> processedFiles){ ... }
static bool SaveIfDirty2(string filePath, byte[] newContent, ConcurrentDictionary<string, object> existingFiles){ ... }
}, TaskCreationOptions.LongRunning);
转换在这里实现:
if (existingFiles == null && m_collectExistingFilesTask.IsCompleted)
{
locks[state.Guid].Set();
lock (modeSwitchGuard)
{
if (existingFiles == null)
{
foreach (var @lock in locks.Values)
{
@lock.Wait();
}
existingFiles = m_collectExistingFilesTask.Result;
foreach (var processedFile in processedFiles.Keys)
{
existingFiles.TryRemove(processedFile, out _);
}
processedFiles = null;
saveJsonFileIfDirty = (filePath, newContent) => SaveIfDirty2(filePath, newContent, existingFiles);
}
}
}
当一个线程检测到阶段 1 应该结束时,它会尝试进入临界区(只有一个成功),然后等待所有其他线程命中临界区并在其上阻塞。它知道其他线程何时到达它,因为每个线程都有自己的
ManualResetEventSlim
实例,当它们到达临界区时,它们会发出信号。其中的线程等待所有这些信号。一旦所有设置完成,这意味着所有线程都被阻塞在关键部分,并且可以安全地执行阶段之间的转换。
到目前为止,我未能将此逻辑移植到异步 I/O。也许是因为我的转换逻辑太复杂了。
如有任何建议,我们将不胜感激。
我刚刚与 Chat GPT 免费层进行了一次有趣的讨论。
这确实是一件很了不起的事情。这是已保存聊天的链接 - https://chatgpt.com/share/66e90794-aea4-8004-82d4-7804f7454019
我还没有测试过,但它的答案(稍作修改)是这样的:
private async Task GetEndWorkItemsConsumerAsync(BlockingCollection<EndWorkItem> endWorkItems, int concurrency)
{
ConcurrentDictionary<string, object> existingFiles = null;
ConcurrentDictionary<string, object> processedFiles = new(StringComparer.OrdinalIgnoreCase);
var locks = new TaskCompletionSource<bool>[concurrency];
for (int i = 0; i < concurrency; ++i)
{
locks[i] = new();
}
SemaphoreSlim modeSwitchSemaphore = new SemaphoreSlim(1, 1);
Func<string, byte[], Task<bool>> saveJsonFileIfDirty = async (filePath, newContent) =>
await SaveIfDirty1Async(filePath, newContent, processedFiles);
var tasks = Enumerable.Range(0, concurrency).Select(async i =>
{
while (await endWorkItems.Reader.WaitToReadAsync())
{
while (endWorkItems.Reader.TryRead(out var endWorkItem))
{
await ProcessEndWorkItemAsync(endWorkItem, saveJsonFileIfDirty);
if (existingFiles == null && m_collectExistingFilesTask.IsCompleted)
{
locks[i].TrySetResult(true); // Signal readiness for phase transition
// Phase transition management
await modeSwitchSemaphore.WaitAsync();
try
{
if (existingFiles == null)
{
// Ensure all tasks have arrived before transitioning
var lockTasks = locks.Values.Select(tcs => tcs.Task);
await Task.WhenAll(lockTasks); // Wait for all threads to synchronize
// Perform the actual phase transition
existingFiles = m_collectExistingFilesTask.Result;
foreach (var processedFile in processedFiles.Keys)
{
existingFiles.TryRemove(processedFile, out _);
}
// Update the delegate for Phase 2
saveJsonFileIfDirty = async (filePath, newContent) =>
await SaveIfDirty2Async(filePath, newContent, existingFiles);
processedFiles = null;
}
}
finally
{
// Release the semaphore to allow other threads to proceed
modeSwitchSemaphore.Release();
}
}
}
}
});
await Task.WhenAll(tasks);
// Clean up stale files after transition
if (existingFiles != null)
{
await DeleteStaleFilesAsync(existingFiles.Keys);
}
}
private async Task ProcessEndWorkItemAsync(EndWorkItem endWorkItem, Func<string, byte[], Task<bool>> saveJsonFileIfDirty)
{
// Process the work item asynchronously
// ...
}
private async Task<bool> SaveIfDirty1Async(string filePath, byte[] newContent, ConcurrentDictionary<string, object> processedFiles)
{
// Perform async file operations in Phase 1
// ...
}
private async Task<bool> SaveIfDirty2Async(string filePath, byte[] newContent, ConcurrentDictionary<string, object> existingFiles)
{
// Perform async file operations in Phase 2
// ...
}
private async Task DeleteStaleFilesAsync(IEnumerable<string> filePaths)
{
// Delete stale files asynchronously
// ...
}
我认为这很神奇,即使它不起作用,但我目前还不知道。稍后检查。