我有一个异步处理管道。我正在实施一个约束,以便我需要限制下一阶段的提交数量。对于我的组件,我有:
如果容量可供多个客户端使用,我将为每个客户端转发一条消息(即,如果我因为客户端 3 的信号量最终变得可用而醒来,我可能首先为客户端 2 发送一条消息,然后为客户端 3 发送消息,依此类推)
处理循环因此等待以下一个或多个条件才能继续处理:
理想情况下,我会使用
Task.WhenAny
与
c.Reader.WaitToReadAsync(ct).AsTask()
Wait
的 SemaphoreSlim
将失败)SemaphoreSlim 的
AvailableWaitHandle
将是理想的选择 - 我想知道它何时可用 但我还不想保留它,因为我有一系列工作需要处理 - 我只想知道我的触发条件之一是否已满足出现了
有没有办法
await
AvailableWaitHandle
?
我当前的方法是源自@usr对类似问题的回答 - 发布供参考
我想知道它什么时候可用,但我还不想保留它,因为我有一系列工作需要处理
这很奇怪,看起来
SemaphoreSlim
可能不是你想要使用的。 SemaphoreSlim
是一种互斥对象,可以允许多个接受者。有时也用于节流。但我不想将其用作信号。
看起来更像异步手动重置事件才是您真正想要的。或者,如果您想维护锁定/并发收集类型的概念,则可以使用异步监视器或条件变量。
也就是说,使用 SemaphoreSlim
作为信号是“可能”的。我只是非常犹豫是否建议将此作为解决方案,因为这个要求似乎突出了同步原语选择中的错误。
是的。您可以使用
await
来
TaskCompletionSource
任何东西。特别是对于 WaitHandle
,ThreadPool.RegisterWaitForSingleObject
为您提供高效的等待。因此,您要做的就是创建一个TCS,向线程池注册该句柄,并在该句柄的回调中完成TCS。请记住,您希望确保 TCS 最终完成并且一切都得到妥善处理。
我的
AsyncEx 库(WaitHandleAsyncFactory.FromWaitHandle
)对此提供支持;代码是
这里。 我的 AsyncEx 库还支持异步手动重置事件、监视器和条件变量。
的变体解决了我的问题
class SemaphoreSlimExtensions
public static Task AwaitButReleaseAsync(this SemaphoreSlim s) =>
s.WaitAsync().ContinueWith(_t -> s.Release(), ct,
TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion,
TaskScheduler.Default);
public static bool TryTake(this SemaphoreSlim s) =>
s.Wait(0);
在我的用例中,
await
只是同步逻辑的触发器,然后遍历整个集合 - 在我的例子中,
TryTake
帮助器是处理信号量的条件获取和取决于信号量的处理的自然方法那。我的等待是这样的:SemaphoreSlim[] throttled = Enumerable.Empty();
while (!ct.IsCancellationRequested)
{
var throttledClients = from s in throttled select s.AwaitButReleaseAsync();
var timeout = 3000;
var otherConditions = new[] { input.Reader.WaitToReadAsync().ToTask(), Task.Delay(ct, timeout) };
await Task.WhenAny(throttledClients.Append(otherConditions));
throttled = propagateStuff();
}
实际代码在这里 - 我还有其他遵循相同一般模式的案例。最重要的是,我想将等待 SemaphoreSlim
上可用容量的担忧与实际保留该容量分开。