ConcurrentDictionary 不适用于递归异步方法

问题描述 投票:0回答:0

这是我第一次使用 C# 进行异步操作,我在读取并发字典时遇到问题。 我有一种递归读取模块依赖关系的方法。由于这需要时间,我构建了它,因此它创建了多个线程。 为了避免从同一模块两次获取数据,我将处理后的结果添加到并发字典中,并尝试在下一次迭代中读取它。 代码如下:

internal delegate void WriteProgress(int id, string activity, string status);
internal delegate void WorkComplete(List<Work> all_work);

public class Helper
{
    private readonly PSCmdlet _context;
    private readonly HashSet<Tuple<string, Guid>> _printed;

    private List<Work> _dep_chain;
    private bool _completed;

    private int _id;
    private string _activity;
    private string _status;

    public Helper(PSCmdlet context)
    {
        _context = context;
        _dep_chain = new();
        _printed = new();
        _completed = false;

        _id = 0;
        _activity = "Listing dependency chain";
        _status = "Initializing...";

        // Console.WriteLine("Helper constructor");
    }

    public void GetDependencyChainList(string lib_name, int max_concurrent_tasks)
    {
        // Console.WriteLine("Helper.GetDependencyChainList");
        Worker worker = new(max_concurrent_tasks, OnWorkComplete, OnWriteProgress);
        worker.EnqueueWork(lib_name, string.Empty, 0, DependencySource.None, Guid.NewGuid(), Guid.NewGuid());

        //int dot_count = 1;
        int progress_time = 0;
        do
        {
            // if (progress_time >= 750)
            // {
            //     if (dot_count >= 3)
            //         dot_count = 1;
            //     else
            //         dot_count++;

            //     progress_time = 0;
            // }

            // StringBuilder buffer = new();
            // buffer.Append($"Listing dependency chain");
            // buffer.Append('.', dot_count);
            _context.WriteProgress(new(_id, _activity, _status));

            Thread.Sleep(10);
            progress_time += 10;

        } while (!_completed);

        GetTextListFromWorkList(_dep_chain);
    }

    private void GetTextListFromWorkList(List<Work> work_list)
    {
        foreach (Work work in from w in work_list orderby w.Depth select w)
        {
            Tuple<string, Guid> current = new(work.Name, work.Id);
            if (_printed.Contains(current))
                continue;

            // Printing the parent.
            PrintWork(work);
            _printed.Add(current);

            // Printing the children.
            List<Work> children = work_list.Where(w => w.ParentId == work.Id).ToList();
            GetTextListFromWorkList(children);
        }
    }

    private void PrintWork(Work work)
    {
        string text = string.Concat(Enumerable.Repeat("  ", work.Depth));
        text = string.Join("", text, $"{work.Name} (Id: {work.Id};Loaded: {work.Loaded}; Parent Id: {work.ParentId};Parent: {work.Parent}): {work.Path}");
        // text = string.Join("", text, $"{work.Name} (Loaded: {work.Loaded}; Parent: {work.Parent}): {work.Path}");

        _context.WriteObject(text);
    }

    private void OnWorkComplete(List<Work> dep_chain)
        => (_completed, _dep_chain) = (true, dep_chain);

    private void OnWriteProgress(int id, string activity, string status)
        => (_id, _activity, _status) = (id, activity, status);
}

internal class Worker
{
    private readonly List<Work> _result;
    private readonly WorkComplete CompletedCallback;
    private readonly WriteProgress ProgressCallback;
    private readonly ConcurrentDictionary<Guid, Work> _completed_unique;

    private readonly SemaphoreSlim _semaphore;

    private int _queued_count;
    private int _completed_count;

    internal Worker(int max_thread_count, WorkComplete completed_callback, WriteProgress progress_callback)
    {
        _queued_count = 0;
        _completed_count = 0;

        _result = new();
        _completed_unique = new();

        _semaphore = new(max_thread_count);
        _semaphore.Release(max_thread_count);

        CompletedCallback = completed_callback;
        ProgressCallback = progress_callback;
    }

    internal void EnqueueWork(string lib_name, string parent, int depth, DependencySource source, Guid id, Guid parent_id)
    {
        Work work = new()
        {
            Id = id,
            ParentId = parent_id,
            Name = lib_name,
            Parent = parent,
            Depth = depth,
            Source = source,
            State = WorkState.Queued,
            Dependencies = new()
        };

        DoWork(work);
    }

    private void DoWork(object obj)
    {
        _queued_count++;

        // Wait for the semaphore.
        _semaphore.Wait();

        Work work = obj as Work;
        if (_completed_unique.TryGetValue(work.Id, out Work existent))
        {
            work.Path = existent.Path;
            work.Loaded = existent.Loaded;
            work.LoaderException = existent.LoaderException;

            lock (_result)
                _result.Add(work);

            return;
        }

        _completed_unique.TryAdd(work.Id, work);

        // Getting dependency information.
        Random random = new();
        Wrapper unwrapper = new();

        LibInfo info = unwrapper.GetDependencyList(work.Name, work.Source);

        work.Name = info.Name;
        work.Path = info.Path;
        work.Loaded = info.Loaded;
        work.LoaderException = info.LoaderError;

        // Creating an id, and task for each dependency.
        if (info.Dependencies is not null && info.Dependencies.Length > 0)
        {
            work.Dependencies = new(info.Dependencies);
            foreach (DependencyEntry entry in info.Dependencies)
            {
                Work dep_work = new()
                {
                    Id = Guid.NewGuid(),
                    ParentId = work.Id,
                    Name = entry.Name,
                    Depth = work.Depth + 1,
                    Dependencies = new(),
                    Source = entry.Source
                };

                ThreadPool.QueueUserWorkItem(DoWork, dep_work);
            }
        }

        // Storing the result.
        lock (_result)
            _result.Add(work);

        // Checking if there are any pending tasks.
        _completed_count++;
        if (_completed_count == _queued_count)
            CompletedCallback(_result);

        // Progress test.
        ProgressCallback(0, "Listing dependency chain", $"Queued: {_queued_count}; Completed: {_completed_count}");

        _semaphore.Release();
    }
}

public class Work
{
    public Guid Id { get; set; }
    public Guid ParentId { get; set; }
    public string Name { get; set; }
    public string Path { get; set; }
    public int Depth { get; set; }
    public string Parent { get; set; }
    public bool Loaded { get; set; }
    public Exception LoaderException { get; set; }
    public List<DependencyEntry> Dependencies { get; set; }
    public WorkState State { get; set; }
    public DependencySource Source { get; set; }
}

public enum WorkState
{
    Queued,
    Running,
    Completed
}

DoWork
上,我在继续之前检查了字典中是否存在该键,但总是失败。添加总是成功的。

在此实现中,我使用了

ThreadPool.QueueUserItem()
,但我也尝试了任务。 对于任务来说更糟糕的是,它会卡在某个地方,就像任务没有异步运行一样。 尝试了
Task.Run()
Task.Factory.StartNew()

有什么想法吗?
谢谢!

c# .net asynchronous task concurrentdictionary
© www.soinside.com 2019 - 2024. All rights reserved.