Task.WhenAny 和 SemaphoreSlim 类

问题描述 投票:0回答:3

使用

WaitHandle.WaitAny
Semaphore
时,如下所示:

var s1 = new Semaphore(1, 1);
var s2 = new Semaphore(1, 1);

var handles = new [] { s1, s2 };

var index = WaitHandle.WaitAny(handles);

handles[index].Release();

似乎可以保证

WaitHandle.WaitAny
只会获取一个信号量。

是否可以为异步(async/await)代码获得类似的行为?

c# asynchronous
3个回答
1
投票

我想不出内置的解决方案。我会这样做:

var s1 = new SemaphoreSlim(1, 1);
var s2 = new SemaphoreSlim(1, 1);

var waits = new [] { s1.WaitAsync(), s2.WaitAsync() };

var firstWait = await Task.WhenAny(waits);

//The wait is still running - perform compensation.
if (firstWait == waits[0])
 waits[1].ContinueWith(_ => s2.Release());
if (firstWait == waits[1])
 waits[0].ContinueWith(_ => s1.Release());

这会获取两个信号量,但会立即释放第二个信号量。这应该是等价的。我无法想象不必要地获取信号量的负面后果(当然性能除外)。


1
投票

这是

WaitAnyAsync
方法的通用实现,它异步获取任何提供的信号量:

/// <summary>
/// Asynchronously waits to enter any of the semaphores in the specified array.
/// </summary>
public static async Task<SemaphoreSlim> WaitAnyAsync(SemaphoreSlim[] semaphores,
    CancellationToken cancellationToken = default)
{
    // Fast path
    cancellationToken.ThrowIfCancellationRequested();
    var acquired = semaphores.FirstOrDefault(x => x.Wait(0));
    if (acquired != null) return acquired;

    // Slow path
    using var cts = CancellationTokenSource.CreateLinkedTokenSource(
        cancellationToken);
    Task<SemaphoreSlim>[] acquireTasks = semaphores
        .Select(async s => { await s.WaitAsync(cts.Token); return s; })
        .ToArray();

    Task<SemaphoreSlim> acquiredTask = await Task.WhenAny(acquireTasks);

    cts.Cancel(); // Cancel all other tasks

    var releaseOtherTasks = acquireTasks
        .Where(task => task != acquiredTask)
        .Select(async task => (await task).Release());

    try { await Task.WhenAll(releaseOtherTasks); }
    catch (OperationCanceledException) { } // Ignore
    catch
    {
        // Consider any other error (possibly SemaphoreFullException or
        // ObjectDisposedException) as a failure, and propagate the exception.
        try { (await acquiredTask).Release(); } catch { }
        throw;
    }

    try { return await acquiredTask; }
    catch (OperationCanceledException)
    {
        // Propagate an exception holding the correct CancellationToken
        cancellationToken.ThrowIfCancellationRequested();
        throw; // Should never happen
    }
}

随着争用越来越高,这种方法变得越来越低效,所以我不建议在热路径中使用它。


-1
投票

@usr 的答案的变体解决了我稍微更普遍的问题(在相当长一段时间后,在尝试与

AvailableWaitHandle
结婚
Task
...)

    class SemaphoreSlimExtensions
    
        public static Task AwaitButReleaseAsync(this SemaphoreSlim s) => 
            s.WaitAsync().ContinueWith(_t -> s.Release(),
            TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion);
        public static bool TryTake(this SemaphoreSlim s) => 
            s.Wait(0);

在我的用例中,

await
只是同步逻辑的触发器,然后遍历整个集合 - 在我的例子中,
TryTake
帮助器是处理信号量的条件获取和取决于信号量的处理的自然方法那个。

var sems = new[] { new SemaphoreSlim(1, 1), new SemaphoreSlim(1, 1) };

await Task.WhenAny(from s in sems select s.AwaitButReleaseAsync());

将其放在这里是因为我相信它是干净、清晰且相对高效的,但很高兴看到它的改进

© www.soinside.com 2019 - 2024. All rights reserved.