如何在不产生内存分配开销的情况下暂停/恢复异步工作线程?

问题描述 投票:0回答:2

我有一个异步方法,其中包含无限的

while
循环。该循环由程序的主线程频繁地暂停和恢复。为此,我当前使用 Nito.AsyncEx
 包中的 
PauseTokenSource 类:

private readonly PauseTokenSource _pausation = new();

private async Task StartWorker()
{
    while (true)
    {
        await _pausation.Token.WaitWhilePausedAsync();
        DoSomething();
    }
}

这工作得很好,但我注意到每次

PauseTokenSource
暂停和恢复时都会分配大约 100 个字节。由于这种情况每秒发生多次,我正在寻找一种方法来消除这种开销。我的想法是用自制的暂停机制替换
PauseTokenSource
,该机制具有返回
WaitWhilePausedAsync
 而不是 
ValueTask
Task
方法。为了使实现免分配,
ValueTask
应该由实现
IValueTaskSource
接口的东西支持。这是我当前(失败的)尝试实现此机制:

public class Pausation : IValueTaskSource
{
    private ManualResetValueTaskSourceCore<bool> _source;
    private bool _paused;

    public Pausation() => _source.RunContinuationsAsynchronously = true;

    public void Pause()
    {
        _paused = true;
        _source.Reset();
    }

    public void Resume()
    {
        _paused = false;
        _source.SetResult(default);
    }

    public ValueTask WaitWhilePausedAsync()
    {
        if (!_paused) return ValueTask.CompletedTask;
        return new ValueTask(this, _source.Version);
    }

    void IValueTaskSource.GetResult(short token)
    {
        _source.GetResult(token);
    }

    ValueTaskSourceStatus IValueTaskSource.GetStatus(short token)
    {
        return _source.GetStatus(token);
    }

    void IValueTaskSource.OnCompleted(Action<object> continuation, object state,
        short token, ValueTaskSourceOnCompletedFlags flags)
    {
        _source.OnCompleted(continuation, state, token, flags);
    }
}

我的

Pausation
类基于
ManualResetValueTaskSourceCore<T>
结构,该结构应该简化最常见的
IValueTaskSource
实现。显然我做错了什么,因为当我的工作人员
await
WaitWhilePausedAsync
方法时,它会因
InvalidOperationException
而崩溃。

我的问题是:如何修复

Pausation
类,使其正常工作?我应该在
_paused
字段之外添加更多状态吗?我是否在错误的地方调用了
ManualResetValueTaskSourceCore<T>
方法?我要求要么提供详细的修复说明,要么要求完整且有效的实施。

规格:

Pausation
类旨在用于单个工作人员 - 单控制器场景。只有一个异步工作线程(
StartWorker
方法),并且只有一个控制器线程发出
Pause
Resume
命令。也不需要取消支持。工作线程的终止由
CancellationTokenSource
独立处理(为简洁起见,从上面的代码片段中删除)。唯一需要的功能是
Pause
Resume
WaitWhilePausedAsync
方法。唯一的要求是它能正常工作,并且不分配内存。

可以在此处找到我的工作人员-控制器场景的可运行在线演示。

Nito.AsyncEx.PauseTokenSource
类的输出:

Controller loops: 112,748
Worker loops: 84, paused: 36 times

我的

Pausation
类的输出:

Controller loops: 117,397
Unhandled exception. System.InvalidOperationException: Operation is not valid due to the current state of the object.
   at System.Threading.Tasks.Sources.ManualResetValueTaskSourceCore`1.GetStatus(Int16 token)
   at Program.Pausation.System.Threading.Tasks.Sources.IValueTaskSource.GetStatus(Int16 token)
   at Program.<>c__DisplayClass0_0.<<Main>g__StartWorker|0>d.MoveNext()
c# asynchronous valuetask nito.asyncex ivaluetasksource
2个回答
2
投票

我相信你误会了

Reset
ManualResetValueTaskSourceCore<T>.Reset
ManualResetEvent.Reset
完全不同。对于
ManualResetValueTaskSourceCore<T>
来说,
Reset
的意思是“之前的操作已经完成,现在想复用value任务,所以换版本”。因此,它应该在
GetResult
之后调用(即,在暂停的代码完成等待之后),而不是在
Pause
内调用。封装此 IMO 的最简单方法是在返回
Reset
之前立即 ValueTask 状态。
类似地,除非已返回值任务,否则您的代码不应调用 

SetResult

ManualResetValueTaskSourceCore<T>
实际上是非常严格地围绕顺序操作设计的,并且拥有单独的“控制器”会使事情变得复杂。您可以通过跟踪消费者是否在等待并仅在有等待时才尝试完成来使其工作:
public class Pausation : IValueTaskSource
{
    private ManualResetValueTaskSourceCore<bool> _source;
    private readonly object _mutex = new();
    private bool _paused;
    private bool _waiting;

    public Pausation() => _source.RunContinuationsAsynchronously = true;

    public void Pause()
    {
        lock (_mutex)
            _paused = true;
    }

    public void Resume()
    {
        var wasWaiting = false;
        lock (_mutex)
        {
            wasWaiting = _waiting;
            _paused = _waiting = false;
        }

        if (wasWaiting)
            _source.SetResult(default);
    }

    public ValueTask WaitWhilePausedAsync()
    {
        lock (_mutex)
        {
            if (!_paused) return ValueTask.CompletedTask;
            _waiting = true;
            _source.Reset();
            return new ValueTask(this, _source.Version);
        }
    }

    void IValueTaskSource.GetResult(short token)
    {
        _source.GetResult(token);
    }

    ValueTaskSourceStatus IValueTaskSource.GetStatus(short token)
    {
        return _source.GetStatus(token);
    }

    void IValueTaskSource.OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
    {
        _source.OnCompleted(continuation, state, token, flags);
    }
}

我使用互斥体修复了一些明显的竞争条件,但我不能保证没有剩下更微妙的条件。至少它肯定会仅限于单个控制器和单个消费者。


0
投票
ValueTask

的目的。 Stephen Toub 详细解释了为什么在这里引入它:

https://devblogs.microsoft.com/dotnet/understanding-the-whys-whats-and-whens-of-valuetask/
但最相关的信息是

ValueTask

不分配

iff
异步方法同步成功完成。 如果确实需要异步完成,由于后台的状态机,就不可避免地需要分配

Task

类。所以这可能完全需要另一个解决方案。

    

© www.soinside.com 2019 - 2024. All rights reserved.