ForAll
运算符结尾的 PLINQ 查询,并使用 WithCancellation
运算符来中途取消查询。令人惊讶的是,查询并没有被取消。以下是此行为的最小演示:
CancellationTokenSource cts = new CancellationTokenSource(1000);
cts.Token.Register(() => Console.WriteLine("--Token Canceled"));
try
{
Enumerable.Range(1, 20)
.AsParallel()
.WithDegreeOfParallelism(2)
.WithCancellation(cts.Token)
.ForAll(x =>
{
Console.WriteLine($"Processing item #{x}");
Thread.Sleep(200);
//cts.Token.ThrowIfCancellationRequested();
});
Console.WriteLine($"The query was completed successfully");
}
catch (OperationCanceledException)
{
Console.WriteLine($"The query was canceled");
}
在线演示.
输出(不需要):
Processing item #1
Processing item #2
Processing item #4
Processing item #3
Processing item #5
Processing item #6
Processing item #8
Processing item #7
Processing item #10
Processing item #9
--Token Canceled
Processing item #11
Processing item #12
Processing item #13
Processing item #14
Processing item #15
Processing item #16
Processing item #17
Processing item #19
Processing item #20
Processing item #18
The query was canceled
查询以
OperationCanceledException
结束,但在处理所有 20 个项目之前完成。当我取消注释 cts.Token.ThrowIfCancellationRequested();
行时,就会出现理想的行为。
输出(期望):
Processing item #2
Processing item #1
Processing item #3
Processing item #4
Processing item #5
Processing item #6
Processing item #7
Processing item #8
Processing item #9
Processing item #10
--Token Canceled
The query was canceled
我做错了什么,还是这是
ForAll
+WithCancellation
组合的设计行为?或者这是 PLINQ 库中的错误?
这似乎是设计使然,但逻辑与您想象的有点不同。如果我们深入研究一下源代码,我们会发现相关的
ForAll
实现here:
while (_source.MoveNext(ref element, ref keyUnused))
{
if ((i++ & CancellationState.POLL_INTERVAL) == 0)
_cancellationToken.ThrowIfCancellationRequested();
_elementAction(element);
}
所以它确实检查取消,但不是每次迭代。如果我们检查
CancellationState.POLL_INTERVAL
:
/// <summary>
/// Poll frequency (number of loops per cancellation check) for situations where per-1-loop testing is too high an overhead.
/// </summary>
internal const int POLL_INTERVAL = 63; //must be of the form (2^n)-1.
// The two main situations requiring POLL_INTERVAL are:
// 1. inner loops of sorting/merging operations
// 2. tight loops that perform very little work per MoveNext call.
// Testing has shown both situations have similar requirements and can share the same constant for polling interval.
//
// Because the poll checks are per-N loops, if there are delays in user code, they may affect cancellation timeliness.
// Guidance is that all user-delegates should perform cancellation checks at least every 1ms.
//
// Inner loop code should poll once per n loop, typically via:
// if ((i++ & CancellationState.POLL_INTERVAL) == 0)
// _cancellationToken.ThrowIfCancellationRequested();
// (Note, this only behaves as expected if FREQ is of the form (2^n)-1
所以基本上 PLINQ 开发人员假设您在
ForAll
(以及类似的方法)中有一个非常快的代码,因此他们认为每次迭代检查取消是浪费的,因此他们每 64 次迭代检查一次。如果您有长时间运行的代码 - 您可以自行检查是否取消。我想他们必须这样做,因为在这种情况下他们无法在所有情况下都做正确的事情,但是如果他们检查每次迭代 - 你将无法避免性能成本。
如果增加代码中的迭代次数并调整取消超时 - 您会发现它确实会在大约 64 次迭代后取消(在每个分区上,总共 128 次)。
Evk 的回答 彻底解释了观察到的行为:PLINQ 运算符定期检查取消令牌,而不是针对每个处理的项目。我寻找一种方法来改变这种行为,我想我找到了一种方法。当使用
foreach
循环枚举并行查询时,每次迭代都会检查取消标记。所以这是我想出的解决方案:
/// <summary>
/// Invokes in parallel the specified action for each element in the source,
/// checking the associated CancellationToken before invoking the action.
/// </summary>
public static void ForAll2<TSource>(this ParallelQuery<TSource> source,
Action<TSource> action)
{
foreach (var _ in source.Select(item => { action(item); return 0; })) { }
}
Select
运算符将 ParallelQuery<TSource>
投影到具有零值的 ParallelQuery<int>
,然后使用空的 foreach
循环进行枚举。 action
作为枚举的副作用被并行调用。
在线演示.
您可以使用 Select 而不是 ForAll
CancellationTokenSource cts = new CancellationTokenSource(1000);
cts.Token.Register(() => Console.WriteLine("--Token Canceled"));
try
{
Enumerable.Range(1, 20)
.AsParallel()
.WithDegreeOfParallelism(2)
.WithCancellation(cts.Token)
.Select(x =>
{
Console.WriteLine($"Processing item #{x}");
Thread.Sleep(200);
//cts.Token.ThrowIfCancellationRequested();
return true;
}).ToArray();
Console.WriteLine($"The query was completed successfully");
}
catch (OperationCanceledException)
{
Console.WriteLine($"The query was canceled");
}