PLINQ ForAll WithCancellation 不起作用

问题描述 投票:0回答:3

我编写了一个以

ForAll
运算符结尾的 PLINQ 查询,并使用
WithCancellation
运算符来中途取消查询。令人惊讶的是,查询并没有被取消。以下是此行为的最小演示:

CancellationTokenSource cts = new CancellationTokenSource(1000);
cts.Token.Register(() => Console.WriteLine("--Token Canceled"));
    
try
{
    Enumerable.Range(1, 20)
        .AsParallel()
        .WithDegreeOfParallelism(2)
        .WithCancellation(cts.Token)
        .ForAll(x =>
        {
            Console.WriteLine($"Processing item #{x}");
            Thread.Sleep(200);
            //cts.Token.ThrowIfCancellationRequested();
        });
    Console.WriteLine($"The query was completed successfully");
}
catch (OperationCanceledException)
{
    Console.WriteLine($"The query was canceled");
}

在线演示.

输出(不需要):

Processing item #1
Processing item #2
Processing item #4
Processing item #3
Processing item #5
Processing item #6
Processing item #8
Processing item #7
Processing item #10
Processing item #9
--Token Canceled
Processing item #11
Processing item #12
Processing item #13
Processing item #14
Processing item #15
Processing item #16
Processing item #17
Processing item #19
Processing item #20
Processing item #18
The query was canceled

查询以

OperationCanceledException
结束,但在处理所有 20 个项目之前完成。当我取消注释
cts.Token.ThrowIfCancellationRequested();
行时,就会出现理想的行为。

输出(期望):

Processing item #2
Processing item #1
Processing item #3
Processing item #4
Processing item #5
Processing item #6
Processing item #7
Processing item #8
Processing item #9
Processing item #10
--Token Canceled
The query was canceled

我做错了什么,还是这是

ForAll
+
WithCancellation
组合的设计行为?或者这是 PLINQ 库中的错误?

c# parallel-processing task-parallel-library cancellation plinq
3个回答
3
投票

这似乎是设计使然,但逻辑与您想象的有点不同。如果我们深入研究一下源代码,我们会发现相关的

ForAll
实现here:

while (_source.MoveNext(ref element, ref keyUnused))
{
    if ((i++ & CancellationState.POLL_INTERVAL) == 0)
        _cancellationToken.ThrowIfCancellationRequested();
    _elementAction(element);         
}

所以它确实检查取消,但不是每次迭代。如果我们检查

CancellationState.POLL_INTERVAL

/// <summary>
/// Poll frequency (number of loops per cancellation check) for situations where per-1-loop testing is too high an overhead.
/// </summary>
internal const int POLL_INTERVAL = 63;  //must be of the form (2^n)-1.

// The two main situations requiring POLL_INTERVAL are:
//    1. inner loops of sorting/merging operations
//    2. tight loops that perform very little work per MoveNext call.
// Testing has shown both situations have similar requirements and can share the same constant for polling interval.
//
// Because the poll checks are per-N loops, if there are delays in user code, they may affect cancellation timeliness.
// Guidance is that all user-delegates should perform cancellation checks at least every 1ms.
//
// Inner loop code should poll once per n loop, typically via:
// if ((i++ & CancellationState.POLL_INTERVAL) == 0)
//     _cancellationToken.ThrowIfCancellationRequested();
// (Note, this only behaves as expected if FREQ is of the form (2^n)-1

所以基本上 PLINQ 开发人员假设您在

ForAll
(以及类似的方法)中有一个非常快的代码,因此他们认为每次迭代检查取消是浪费的,因此他们每 64 次迭代检查一次。如果您有长时间运行的代码 - 您可以自行检查是否取消。我想他们必须这样做,因为在这种情况下他们无法在所有情况下都做正确的事情,但是如果他们检查每次迭代 - 你将无法避免性能成本。

如果增加代码中的迭代次数并调整取消超时 - 您会发现它确实会在大约 64 次迭代后取消(在每个分区上,总共 128 次)。


0
投票

Evk 的回答 彻底解释了观察到的行为:PLINQ 运算符定期检查取消令牌,而不是针对每个处理的项目。我寻找一种方法来改变这种行为,我想我找到了一种方法。当使用

foreach
循环枚举并行查询时,每次迭代都会检查取消标记。所以这是我想出的解决方案:

/// <summary>
/// Invokes in parallel the specified action for each element in the source,
/// checking the associated CancellationToken before invoking the action.
/// </summary>
public static void ForAll2<TSource>(this ParallelQuery<TSource> source,
    Action<TSource> action)
{
    foreach (var _ in source.Select(item => { action(item); return 0; })) { }
}

Select
运算符将
ParallelQuery<TSource>
投影到具有零值的
ParallelQuery<int>
,然后使用空的
foreach
循环进行枚举。
action
作为枚举的副作用被并行调用。

在线演示.


0
投票

您可以使用 Select 而不是 ForAll

CancellationTokenSource cts = new CancellationTokenSource(1000);
cts.Token.Register(() => Console.WriteLine("--Token Canceled"));
    
try
{
    Enumerable.Range(1, 20)
        .AsParallel()
        .WithDegreeOfParallelism(2)
        .WithCancellation(cts.Token)
        .Select(x =>
        {
            Console.WriteLine($"Processing item #{x}");
            Thread.Sleep(200);
            //cts.Token.ThrowIfCancellationRequested();
            return true;
        }).ToArray();
    Console.WriteLine($"The query was completed successfully");
}
catch (OperationCanceledException)
{
    Console.WriteLine($"The query was canceled");
}
© www.soinside.com 2019 - 2024. All rights reserved.