使用 Task.WhenAny 等待 SemaphoreSlim 上的容量

问题描述 投票:0回答:2

我有一个异步处理管道。我正在实施一个约束,以便我需要限制下一阶段的提交数量。对于我的组件,我有:

  • 单个输入源(项目用源 ID 标记)
  • 我需要以循环方式将输入传播到的单个目的地

如果容量可供多个客户端使用,我将为每个客户端转发一条消息(即,如果我因为客户端 3 的信号量最终变得可用而醒来,我可能首先为客户端 2 发送一条消息,然后为客户端 3 发送消息,依此类推)

处理循环因此等待以下一个或多个条件才能继续处理:

  • 更多输入已到达(可能是针对未达到限制的客户端)
  • 我们为其保存数据的客户端已释放容量

理想情况下,我会使用

Task.WhenAny

  • 代表输入的任务
    c.Reader.WaitToReadAsync(ct).AsTask()
  • 代表我们持有数据的客户端的 N 个任务,但尚未提交有效(
    Wait
    SemaphoreSlim
    将失败)

SemaphoreSlim 的

AvailableWaitHandle
将是理想的选择 - 我想知道它何时可用 但我还不想保留它,因为我有一系列工作需要处理 - 我只想知道我的触发条件之一是否已满足出现了

有没有办法

await
AvailableWaitHandle

我当前的方法是源自@usr对类似问题的回答 - 发布供参考

我的实际代码在这里 - 在下面的我的自我回答中还有一些关于整个问题的更多细节

c# asynchronous concurrency semaphore system.threading.channels
2个回答
3
投票

我想知道它什么时候可用,但我还不想保留它,因为我有一系列工作需要处理

这很奇怪,看起来

SemaphoreSlim
可能不是你想要使用的。
SemaphoreSlim
是一种互斥对象,可以允许多个接受者。有时也用于节流。但我不想将其用作信号

看起来更像异步手动重置事件才是您真正想要的。或者,如果您想维护锁定/并发收集类型的概念,则可以使用异步监视器或条件变量。

也就是说,使用 SemaphoreSlim 作为信号是“可能”的。我只是非常犹豫是否建议将此作为解决方案,因为这个要求似乎突出了同步原语选择中的错误。


有没有办法等待AvailableWaitHandle?

是的。您可以使用
await

TaskCompletionSource
任何东西。特别是对于
WaitHandle
ThreadPool.RegisterWaitForSingleObject
为您提供高效的等待。
因此,您要做的就是创建一个TCS,向线程池注册该句柄,并在该句柄的回调中完成TCS。请记住,您希望确保 TCS 最终完成并且一切都得到妥善处理。

我的

AsyncEx 库

WaitHandleAsyncFactory.FromWaitHandle)对此提供支持;代码是

这里
我的 AsyncEx 库还支持异步手动重置事件、监视器和条件变量。


0
投票
@usr 的答案

的变体解决了我的问题 class SemaphoreSlimExtensions public static Task AwaitButReleaseAsync(this SemaphoreSlim s) => s.WaitAsync().ContinueWith(_t -> s.Release(), ct, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion, TaskScheduler.Default); public static bool TryTake(this SemaphoreSlim s) => s.Wait(0);

在我的用例中,
await

只是同步逻辑的触发器,然后遍历整个集合 - 在我的例子中,

TryTake
帮助器是处理信号量的条件获取和取决于信号量的处理的自然方法那。我的等待是这样的:
SemaphoreSlim[] throttled = Enumerable.Empty();
while (!ct.IsCancellationRequested)
{
    var throttledClients = from s in throttled select s.AwaitButReleaseAsync();
    var timeout = 3000;
    var otherConditions = new[] { input.Reader.WaitToReadAsync().ToTask(), Task.Delay(ct, timeout) };
    
    await Task.WhenAny(throttledClients.Append(otherConditions));
    throttled = propagateStuff();
}

实际代码在这里

- 我还有其他遵循相同一般模式的案例。最重要的是,我想将等待 SemaphoreSlim 上可用容量的担忧与实际保留该容量分开。

    

© www.soinside.com 2019 - 2024. All rights reserved.