这是我第一次使用 C# 进行异步操作,我在读取并发字典时遇到问题。 我有一种递归读取模块依赖关系的方法。由于这需要时间,我构建了它,因此它创建了多个线程。 为了避免从同一模块两次获取数据,我将处理后的结果添加到并发字典中,并尝试在下一次迭代中读取它。 代码如下:
internal delegate void WriteProgress(int id, string activity, string status);
internal delegate void WorkComplete(List<Work> all_work);
public class Helper
{
private readonly PSCmdlet _context;
private readonly HashSet<Tuple<string, Guid>> _printed;
private List<Work> _dep_chain;
private bool _completed;
private int _id;
private string _activity;
private string _status;
public Helper(PSCmdlet context)
{
_context = context;
_dep_chain = new();
_printed = new();
_completed = false;
_id = 0;
_activity = "Listing dependency chain";
_status = "Initializing...";
// Console.WriteLine("Helper constructor");
}
public void GetDependencyChainList(string lib_name, int max_concurrent_tasks)
{
// Console.WriteLine("Helper.GetDependencyChainList");
Worker worker = new(max_concurrent_tasks, OnWorkComplete, OnWriteProgress);
worker.EnqueueWork(lib_name, string.Empty, 0, DependencySource.None, Guid.NewGuid(), Guid.NewGuid());
//int dot_count = 1;
int progress_time = 0;
do
{
// if (progress_time >= 750)
// {
// if (dot_count >= 3)
// dot_count = 1;
// else
// dot_count++;
// progress_time = 0;
// }
// StringBuilder buffer = new();
// buffer.Append($"Listing dependency chain");
// buffer.Append('.', dot_count);
_context.WriteProgress(new(_id, _activity, _status));
Thread.Sleep(10);
progress_time += 10;
} while (!_completed);
GetTextListFromWorkList(_dep_chain);
}
private void GetTextListFromWorkList(List<Work> work_list)
{
foreach (Work work in from w in work_list orderby w.Depth select w)
{
Tuple<string, Guid> current = new(work.Name, work.Id);
if (_printed.Contains(current))
continue;
// Printing the parent.
PrintWork(work);
_printed.Add(current);
// Printing the children.
List<Work> children = work_list.Where(w => w.ParentId == work.Id).ToList();
GetTextListFromWorkList(children);
}
}
private void PrintWork(Work work)
{
string text = string.Concat(Enumerable.Repeat(" ", work.Depth));
text = string.Join("", text, $"{work.Name} (Id: {work.Id};Loaded: {work.Loaded}; Parent Id: {work.ParentId};Parent: {work.Parent}): {work.Path}");
// text = string.Join("", text, $"{work.Name} (Loaded: {work.Loaded}; Parent: {work.Parent}): {work.Path}");
_context.WriteObject(text);
}
private void OnWorkComplete(List<Work> dep_chain)
=> (_completed, _dep_chain) = (true, dep_chain);
private void OnWriteProgress(int id, string activity, string status)
=> (_id, _activity, _status) = (id, activity, status);
}
internal class Worker
{
private readonly List<Work> _result;
private readonly WorkComplete CompletedCallback;
private readonly WriteProgress ProgressCallback;
private readonly ConcurrentDictionary<Guid, Work> _completed_unique;
private readonly SemaphoreSlim _semaphore;
private int _queued_count;
private int _completed_count;
internal Worker(int max_thread_count, WorkComplete completed_callback, WriteProgress progress_callback)
{
_queued_count = 0;
_completed_count = 0;
_result = new();
_completed_unique = new();
_semaphore = new(max_thread_count);
_semaphore.Release(max_thread_count);
CompletedCallback = completed_callback;
ProgressCallback = progress_callback;
}
internal void EnqueueWork(string lib_name, string parent, int depth, DependencySource source, Guid id, Guid parent_id)
{
Work work = new()
{
Id = id,
ParentId = parent_id,
Name = lib_name,
Parent = parent,
Depth = depth,
Source = source,
State = WorkState.Queued,
Dependencies = new()
};
DoWork(work);
}
private void DoWork(object obj)
{
_queued_count++;
// Wait for the semaphore.
_semaphore.Wait();
Work work = obj as Work;
if (_completed_unique.TryGetValue(work.Id, out Work existent))
{
work.Path = existent.Path;
work.Loaded = existent.Loaded;
work.LoaderException = existent.LoaderException;
lock (_result)
_result.Add(work);
return;
}
_completed_unique.TryAdd(work.Id, work);
// Getting dependency information.
Random random = new();
Wrapper unwrapper = new();
LibInfo info = unwrapper.GetDependencyList(work.Name, work.Source);
work.Name = info.Name;
work.Path = info.Path;
work.Loaded = info.Loaded;
work.LoaderException = info.LoaderError;
// Creating an id, and task for each dependency.
if (info.Dependencies is not null && info.Dependencies.Length > 0)
{
work.Dependencies = new(info.Dependencies);
foreach (DependencyEntry entry in info.Dependencies)
{
Work dep_work = new()
{
Id = Guid.NewGuid(),
ParentId = work.Id,
Name = entry.Name,
Depth = work.Depth + 1,
Dependencies = new(),
Source = entry.Source
};
ThreadPool.QueueUserWorkItem(DoWork, dep_work);
}
}
// Storing the result.
lock (_result)
_result.Add(work);
// Checking if there are any pending tasks.
_completed_count++;
if (_completed_count == _queued_count)
CompletedCallback(_result);
// Progress test.
ProgressCallback(0, "Listing dependency chain", $"Queued: {_queued_count}; Completed: {_completed_count}");
_semaphore.Release();
}
}
public class Work
{
public Guid Id { get; set; }
public Guid ParentId { get; set; }
public string Name { get; set; }
public string Path { get; set; }
public int Depth { get; set; }
public string Parent { get; set; }
public bool Loaded { get; set; }
public Exception LoaderException { get; set; }
public List<DependencyEntry> Dependencies { get; set; }
public WorkState State { get; set; }
public DependencySource Source { get; set; }
}
public enum WorkState
{
Queued,
Running,
Completed
}
在
DoWork
上,我在继续之前检查了字典中是否存在该键,但总是失败。添加总是成功的。
在此实现中,我使用了
ThreadPool.QueueUserItem()
,但我也尝试了任务。
对于任务来说更糟糕的是,它会卡在某个地方,就像任务没有异步运行一样。
尝试了Task.Run()
和Task.Factory.StartNew()
。
有什么想法吗?
谢谢!