我正在尝试编写一个 C# 系统,它将多线程遍历树结构。另一种看待这个问题的方式是
BlockingCollection
的消费者也是生产者。
我遇到的问题是告诉一切何时完成。 我真正需要的测试是查看所有线程是否都在
TryTake
上。
如果是的话,那么一切都完成了,但我找不到一种方法来测试它,或者用任何有助于实现这一目标的东西来包装它。
据我所知,下面的代码是该代码的一个非常简单的示例,但在某种情况下该代码可能会失败。如果第一个线程刚刚通过
test.TryTake(out v,-1)
并且尚未执行 s.Release();
并且它只是从集合中拉出最后一项,而第二个线程刚刚执行 if (s.CurrentCount == 0 && test.Count == 0)
这可能会返回 true,并错误地开始完成事情向上。
但是第一个线程将继续并尝试向集合中添加更多内容。
如果我能画出台词:
if (!test.TryTake(out v, -1))
break;
s.Release();
atomic 那么我相信这段代码可以工作。 (这显然是不可能的。)
但我不知道如何修复这个缺陷。
class Program
{
private static BlockingCollection<int> test;
static void Main(string[] args)
{
test = new BlockingCollection<int>();
WorkClass.s = new SemaphoreSlim(2);
WorkClass w0 = new WorkClass("A");
WorkClass w1 = new WorkClass("B");
Thread t0 = new Thread(w0.WorkFunction);
Thread t1 = new Thread(w1.WorkFunction);
test.Add(10);
t0.Start();
t1.Start();
t0.Join();
t1.Join();
Console.WriteLine("Done");
Console.ReadLine();
}
class WorkClass
{
public static SemaphoreSlim s;
private readonly string _name;
public WorkClass(string name)
{
_name = name;
}
public void WorkFunction(object t)
{
while (true)
{
int v;
s.Wait();
if (s.CurrentCount == 0 && test.Count == 0)
test.CompleteAdding();
if (!test.TryTake(out v, -1))
break;
s.Release();
Console.WriteLine(_name + " = " + v);
Thread.Sleep(5);
for (int i = 0; i < v; i++)
test.Add(i);
}
Console.WriteLine("Done " + _name);
}
}
}
这可以使用任务并行性来并行化。树中的每个节点都被视为一个可能产生子任务的任务。有关更详细的说明,请参阅动态任务并行性。
对于具有 5 个级别的二叉树,将每个节点写入控制台并等待 5 毫秒(如示例所示),ParallelWalk 方法将按如下所示查找示例:
class Program
{
internal class TreeNode
{
internal TreeNode(int level)
{
Level = level;
}
internal int Level { get; }
}
static void Main(string[] args)
{
ParallelWalk(new TreeNode(0));
Console.Read();
}
static void ParallelWalk(TreeNode node)
{
if (node == null) return;
Console.WriteLine(node.Level);
Thread.Sleep(5);
if(node.Level > 4) return;
int nextLevel = node.Level + 1;
var t1 = Task.Factory.StartNew(
() => ParallelWalk(new TreeNode(nextLevel)));
var t2 = Task.Factory.StartNew(
() => ParallelWalk(new TreeNode(nextLevel)));
Task.WaitAll(t1, t2);
}
}
中心线是任务 t1 和 t2 产生的地方。
通过任务的这种分解,调度由任务并行库完成,您不必再管理一组共享节点。