可等待的自动重置事件

问题描述 投票:0回答:11

AutoResetEvent 的异步(可等待)等价物是什么?

如果在经典的线程同步中我们会使用这样的东西:

    AutoResetEvent signal = new AutoResetEvent(false);

    void Thread1Proc()
    {
        //do some stuff
        //..
        //..

        signal.WaitOne(); //wait for an outer thread to signal we are good to continue

        //do some more stuff
        //..
        //..
    }

    void Thread2Proc()
    {
        //do some stuff
        //..
        //..

        signal.Set(); //signal the other thread it's good to go

        //do some more stuff
        //..
        //..
    }

我希望在新的异步处理方式中,会出现这样的情况:

SomeAsyncAutoResetEvent asyncSignal = new SomeAsyncAutoResetEvent();

async void Task1Proc()
{
    //do some stuff
    //..
    //..

    await asyncSignal.WaitOne(); //wait for an outer thread to signal we are good to continue

    //do some more stuff
    //..
    //..
}

async void Task2Proc()
{
    //do some stuff
    //..
    //..

    asyncSignal.Set(); //signal the other thread it's good to go

    //do some more stuff
    //..
    //..
}

我见过其他定制的解决方案,但在某个时间点我设法得到的解决方案仍然涉及锁定线程。我不希望这只是为了使用新的等待语法。我正在寻找一种真正的可等待信号机制,它不会锁定任何线程。

任务并行库中是否缺少某些内容?

编辑:只是为了澄清:SomeAsyncAutoResetEvent 是一个完全组成的类名,在我的示例中用作占位符。

c# .net multithreading asynchronous task-parallel-library
11个回答
34
投票

如果您想构建自己的,Stephen Toub 有关于该主题的权威博客文章

如果您想使用已经编写的库,我的 AsyncEx 库中有一个。 AFAIK,截至撰写本文时没有其他选择。


23
投票

这是 Stephen Toub 的 AsyncAutoResetEvent

来源
,以防他的博客离线。

internal sealed class AsyncAutoResetEvent
{
    private static readonly Task s_completed = Task.FromResult(true);
    private readonly Queue<TaskCompletionSource<bool>> _waits = new Queue<TaskCompletionSource<bool>>();
    private bool _signaled;

    public Task WaitAsync()
    {
        lock (_waits)
        {
            if (_signaled)
            {
                _signaled = false;
                return s_completed;
            }
            else
            {
                var tcs = new TaskCompletionSource<bool>();
                _waits.Enqueue(tcs);
                return tcs.Task;
            }
        }
    }

    public void Set()
    {
        TaskCompletionSource<bool>? toRelease = null;

        lock (_waits)
        {
            if (_waits.Count > 0)
            {
                toRelease = _waits.Dequeue();
            }
            else if (!_signaled)
            {
                _signaled = true;
            }
        }

        toRelease?.SetResult(true);
    }
}

18
投票

我认为 MSDN 上有一个很好的例子:https://msdn.microsoft.com/en-us/library/hh873178%28v=vs.110%29.aspx#WHToTap

public static Task WaitOneAsync(this WaitHandle waitHandle)
{
    if (waitHandle == null) 
        throw new ArgumentNullException("waitHandle");

    var tcs = new TaskCompletionSource<bool>();
    var rwh = ThreadPool.RegisterWaitForSingleObject(waitHandle, 
        delegate { tcs.TrySetResult(true); }, null, -1, true);
    var t = tcs.Task;
    t.ContinueWith( (antecedent) => rwh.Unregister(null));
    return t;
}

9
投票

这是我编写的一个版本,它允许您指定超时。它源自 Stephen Toub 的解决方案。我们目前在生产工作负载中使用它。

public class AsyncAutoResetEvent
{
    readonly LinkedList<TaskCompletionSource<bool>> waiters = 
        new LinkedList<TaskCompletionSource<bool>>();

    bool isSignaled;

    public AsyncAutoResetEvent(bool signaled)
    {
        this.isSignaled = signaled;
    }

    public Task<bool> WaitAsync(TimeSpan timeout)
    {
        return this.WaitAsync(timeout, CancellationToken.None);
    }

    public async Task<bool> WaitAsync(TimeSpan timeout, CancellationToken cancellationToken)
    {
        TaskCompletionSource<bool> tcs;

        lock (this.waiters)
        {
            if (this.isSignaled)
            {
                this.isSignaled = false;
                return true;
            }
            else if (timeout == TimeSpan.Zero)
            {
                return this.isSignaled;
            }
            else
            {
                tcs = new TaskCompletionSource<bool>();
                this.waiters.AddLast(tcs);
            }
        }

        Task winner = await Task.WhenAny(tcs.Task, Task.Delay(timeout, cancellationToken));
        if (winner == tcs.Task)
        {
            // The task was signaled.
            return true;
        }
        else
        {
            // We timed-out; remove our reference to the task.
            // This is an O(n) operation since waiters is a LinkedList<T>.
            lock (this.waiters)
            {
                bool removed = this.waiters.Remove(tcs);
                Debug.Assert(removed);
                return false;
            }
        }
    }

    public void Set()
    {
        lock (this.waiters)
        {
            if (this.waiters.Count > 0)
            {
                // Signal the first task in the waiters list. This must be done on a new
                // thread to avoid stack-dives and situations where we try to complete the
                // same result multiple times.
                TaskCompletionSource<bool> tcs = this.waiters.First.Value;
                Task.Run(() => tcs.SetResult(true));
                this.waiters.RemoveFirst();
            }
            else if (!this.isSignaled)
            {
                // No tasks are pending
                this.isSignaled = true;
            }
        }
    }

    public override string ToString()
    {
        return $"Signaled: {this.isSignaled.ToString()}, Waiters: {this.waiters.Count.ToString()}";
    }
}

4
投票

我也在寻找 AsyncAutoResetEvent 类,现在似乎在命名空间 Microsoft.VisualStudio.Threading 中有一个可用

// Summary:
//     An asynchronous implementation of an AutoResetEvent.
[DebuggerDisplay("Signaled: {signaled}")]
public class AsyncAutoResetEvent

2
投票

也可以,但是这样可能会淡化使用

async
await
的目的。

AutoResetEvent asyncSignal = new AutoResetEvent();

async void Task1Proc()
{
    //do some stuff
    //..
    //..

    await Task.Run(() => asyncSignal.WaitOne()); //wait for an outer thread to signal we are good to continue

    //do some more stuff
    //..
    //..
}

2
投票

使用 SemaphoreSlim 实现可能要简单得多,我还没有在生产中测试过它,但它应该可以正常工作。

public class AsyncAutoResetEvent : IDisposable
{
    private readonly SemaphoreSlim _waiters;

    public AsyncAutoResetEvent(bool initialState)
        => _waiters = new SemaphoreSlim(initialState ? 1 : 0, 1);

    public Task<bool> WaitOneAsync(TimeSpan timeout, CancellationToken cancellationToken = default)
        => _waiters.WaitAsync(timeout, cancellationToken);

    public Task WaitOneAsync(CancellationToken cancellationToken = default)
        => _waiters.WaitAsync(cancellationToken);

    public void Set()
    {
        lock (_waiters)
        {
            if (_waiters.CurrentCount == 0)
                _waiters.Release();
        }
    }

    public override string ToString()
        => $"Signaled: {_waiters.CurrentCount != 0}"; 

    private bool _disposed;
    protected virtual void Dispose(bool disposing)
    {
        if (!_disposed)
        {
            if (disposing)
            {
                _waiters.Dispose();
            }

            _disposed = true;
        }
    }

    public void Dispose()
    {
        // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
        Dispose(disposing: true);
        GC.SuppressFinalize(this);
    }
}

2
投票

安装

Microsoft.VisualStudio.Threading
包(我使用 Nuget 完成),然后您将能够使用
AsyncAutoResetEvent
类,该类在其自己的文档中指出:

AutoResetEvent 的异步实现。

文档:https://learn.microsoft.com/en-us/dotnet/api/microsoft.visualstudio.threading.asyncautoresetevent


0
投票

我使用可选的超时(毫秒)扩展了 Oleg Gordeev 提供的 MSDN 示例:

public static Task WaitOneAsync(this WaitHandle waitHandle, double timeout = 0)
        {
            if (waitHandle == null) throw new ArgumentNullException("waitHandle");

            var tcs = new TaskCompletionSource<bool>();

            if (timeout > 0) 
            {
                var timer = new System.Timers.Timer(timeout) 
                { Enabled = true, AutoReset = false };

                ElapsedEventHandler del = default;
                del = delegate (object x, System.Timers.ElapsedEventArgs y)
                {
                    tcs.TrySetResult(true);
                    timer.Elapsed -= del; 
                    timer.Dispose();
                };

                timer.Elapsed += del;
            }
        
            var rwh = ThreadPool.RegisterWaitForSingleObject(waitHandle,
                      delegate { tcs.TrySetResult(true); },
                      null, -1, true);

            var t = tcs.Task;
            t.ContinueWith((antecedent) => rwh.Unregister(null));

            return t;
        }

-1
投票

这是我的一次性事件版本可以由多个线程等待。它内部依赖于

BoundedChannel

public class AsyncOneTimeEvent<T>
{
    private T Result { get; set; }

    private readonly Channel<bool> _channel = Channel.CreateBounded<bool>(new BoundedChannelOptions(1)
    {
        SingleReader = false,
        SingleWriter = true,
        FullMode = BoundedChannelFullMode.DropWrite,
    });

    public async Task<T> GetResult()
    {
        await _channel.Reader.WaitToReadAsync().ConfigureAwait(false);

        return this.Result;
    }

    public void SetResult(T result)
    {
        this.Result = result;
        _channel.Writer.Complete();
    }

    public void SetError(Exception ex)
    {
        _channel.Writer.Complete(ex);
    }
}

-1
投票

这是我使用

SemaphoreSlim
的完整实现,使用所有
SemaphoreSlim.WaitAsync
覆盖:

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

/// <summary>
/// Represents an event that, when signaled, resets automatically after releasing a single waiting task.
/// </summary>
public sealed class AutoResetEventAsync : IDisposable {

    /// <summary>
    /// Waits asynchronously until a signal is received.
    /// </summary>
    /// <returns>Task completed when the event is signaled.</returns>
    public async ValueTask WaitAsync() {
        if (CheckSignaled()) return;
        SemaphoreSlim s;
        lock (Q) Q.Enqueue(s = new(0, 1));
        await s.WaitAsync();
        lock (Q) if (Q.Count > 0 && Q.Peek() == s) Q.Dequeue().Dispose();
    }

    /// <summary>
    /// Waits asynchronously until a signal is received or the time runs out.
    /// </summary>
    /// <param name="millisecondsTimeout">The number of milliseconds to wait, <see cref="System.Threading.Timeout.Infinite"/>
    /// (-1) to wait indefinitely, or zero to return immediately.</param>
    /// <returns>Task completed when the event is signaled or the time runs out.</returns>
    public async ValueTask WaitAsync(int millisecondsTimeout) {
        if (CheckSignaled()) return;
        SemaphoreSlim s;
        lock (Q) Q.Enqueue(s = new(0, 1));
        await s.WaitAsync(millisecondsTimeout);
        lock (Q) if (Q.Count > 0 && Q.Peek() == s) Q.Dequeue().Dispose();
    }

    /// <summary>
    /// Waits asynchronously until a signal is received, the time runs out or the token is cancelled.
    /// </summary>
    /// <param name="millisecondsTimeout">The number of milliseconds to wait, <see cref="System.Threading.Timeout.Infinite"/>
    /// (-1) to wait indefinitely, or zero to return immediately.</param>
    /// <param name="cancellationToken">The <see cref="System.Threading.CancellationToken"/> to observe.</param>
    /// <returns>Task completed when the event is signaled, the time runs out or the token is cancelled.</returns>
    public async ValueTask WaitAsync(int millisecondsTimeout, CancellationToken cancellationToken) {
        if (CheckSignaled()) return;
        SemaphoreSlim s;
        lock (Q) Q.Enqueue(s = new(0, 1));
        try {
            await s.WaitAsync(millisecondsTimeout, cancellationToken);
        }
        finally {
            lock (Q) if (Q.Count > 0 && Q.Peek() == s) Q.Dequeue().Dispose();
        }
    }

    /// <summary>
    /// Waits asynchronously until a signal is received or the token is cancelled.
    /// </summary>
    /// <param name="cancellationToken">The <see cref="System.Threading.CancellationToken"/> to observe.</param>
    /// <returns>Task completed when the event is signaled or the token is cancelled.</returns>
    public async ValueTask WaitAsync(CancellationToken cancellationToken) {
        if (CheckSignaled()) return;
        SemaphoreSlim s;
        lock (Q) Q.Enqueue(s = new(0, 1));
        try {
            await s.WaitAsync(cancellationToken);
        }
        finally {
            lock (Q) if (Q.Count > 0 && Q.Peek() == s) Q.Dequeue().Dispose();
        }
    }

    /// <summary>
    /// Waits asynchronously until a signal is received or the time runs out.
    /// </summary>
    /// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds to wait,
    /// a <see cref="System.TimeSpan"/> that represents -1 milliseconds to wait indefinitely, or a System.TimeSpan
    /// that represents 0 milliseconds to return immediately.</param>
    /// <returns>Task completed when the event is signaled or the time runs out.</returns>
    public async ValueTask WaitAsync(TimeSpan timeout) {
        if (CheckSignaled()) return;
        SemaphoreSlim s;
        lock (Q) Q.Enqueue(s = new(0, 1));
        await s.WaitAsync(timeout);
        lock (Q) if (Q.Count > 0 && Q.Peek() == s) Q.Dequeue().Dispose();
    }

    /// <summary>
    /// Waits asynchronously until a signal is received, the time runs out or the token is cancelled.
    /// </summary>
    /// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds to wait,
    /// a <see cref="System.TimeSpan"/> that represents -1 milliseconds to wait indefinitely, or a System.TimeSpan
    /// that represents 0 milliseconds to return immediately.</param>
    /// <param name="cancellationToken">The <see cref="System.Threading.CancellationToken"/> to observe.</param>
    /// <returns>Task completed when the event is signaled, the time runs out or the token is cancelled.</returns>
    public async ValueTask WaitAsync(TimeSpan timeout, CancellationToken cancellationToken) {
        if (CheckSignaled()) return;
        SemaphoreSlim s;
        lock (Q) Q.Enqueue(s = new(0, 1));
        try {
            await s.WaitAsync(timeout, cancellationToken);
        }
        finally {
            lock (Q) if (Q.Count > 0 && Q.Peek() == s) Q.Dequeue().Dispose();
        }
    }

    /// <summary>
    /// Sets the state of the event to signaled, allowing one or more waiting tasks to proceed.
    /// </summary>
    public void Set() {
        SemaphoreSlim? toRelease = null;
        lock (Q) {
            if (Q.Count > 0) toRelease = Q.Dequeue();
            else if (!IsSignaled) IsSignaled = true;
        }
        toRelease?.Release();
    }

    /// <summary>
    /// Sets the state of the event to non nonsignaled, making the waiting tasks to wait.
    /// </summary>
    public void Reset() => IsSignaled = false;

    /// <summary>
    /// Disposes any semaphores left in the queue.
    /// </summary>
    public void Dispose() {
        lock (Q) {
            while (Q.Count > 0) Q.Dequeue().Dispose();
        }
    }

    /// <summary>
    /// Checks the <see cref="IsSignaled"/> state and resets it when it's signaled.
    /// </summary>
    /// <returns>True if the event was in signaled state.</returns>
    private bool CheckSignaled() {
        lock (Q) {
            if (IsSignaled) {
                IsSignaled = false;
                return true;
            }
            return false;
        }
    }

    private readonly Queue<SemaphoreSlim> Q = new();
    private volatile bool IsSignaled;

}

我使用

SemaphoreSlim
,因为它“免费”提供超时和取消令牌支持。如果我只是将
SemaphoreSlim
的原始 .NET 源代码修改为像
AutoResetEvent
那样,那就更好了,但是,仅此而已。如果您发现任何错误,请告诉我。

© www.soinside.com 2019 - 2024. All rights reserved.