我有一个异步方法,其中包含无限的
while
循环。该循环由程序的主线程频繁地暂停和恢复。为此,我当前使用 Nito.AsyncEx包中的
PauseTokenSource
类:
private readonly PauseTokenSource _pausation = new();
private async Task StartWorker()
{
while (true)
{
await _pausation.Token.WaitWhilePausedAsync();
DoSomething();
}
}
这工作得很好,但我注意到每次
PauseTokenSource
暂停和恢复时都会分配大约 100 个字节。由于这种情况每秒发生多次,我正在寻找一种方法来消除这种开销。我的想法是用自制的暂停机制替换 PauseTokenSource
,该机制具有返回 WaitWhilePausedAsync
而不是
ValueTask
的 Task
方法。为了使实现免分配,ValueTask
应该由实现IValueTaskSource
接口的东西支持。这是我当前(失败的)尝试实现此机制:
public class Pausation : IValueTaskSource
{
private ManualResetValueTaskSourceCore<bool> _source;
private bool _paused;
public Pausation() => _source.RunContinuationsAsynchronously = true;
public void Pause()
{
_paused = true;
_source.Reset();
}
public void Resume()
{
_paused = false;
_source.SetResult(default);
}
public ValueTask WaitWhilePausedAsync()
{
if (!_paused) return ValueTask.CompletedTask;
return new ValueTask(this, _source.Version);
}
void IValueTaskSource.GetResult(short token)
{
_source.GetResult(token);
}
ValueTaskSourceStatus IValueTaskSource.GetStatus(short token)
{
return _source.GetStatus(token);
}
void IValueTaskSource.OnCompleted(Action<object> continuation, object state,
short token, ValueTaskSourceOnCompletedFlags flags)
{
_source.OnCompleted(continuation, state, token, flags);
}
}
我的
Pausation
类基于 ManualResetValueTaskSourceCore<T>
结构,该结构应该简化最常见的 IValueTaskSource
实现。显然我做错了什么,因为当我的工作人员await
是WaitWhilePausedAsync
方法时,它会因InvalidOperationException
而崩溃。
我的问题是:如何修复
Pausation
类,使其正常工作?我应该在 _paused
字段之外添加更多状态吗?我是否在错误的地方调用了 ManualResetValueTaskSourceCore<T>
方法?我要求要么提供详细的修复说明,要么要求完整且有效的实施。
规格:
Pausation
类旨在用于单个工作人员 - 单控制器场景。只有一个异步工作线程(StartWorker
方法),并且只有一个控制器线程发出Pause
和Resume
命令。也不需要取消支持。工作线程的终止由 CancellationTokenSource
独立处理(为简洁起见,从上面的代码片段中删除)。唯一需要的功能是 Pause
、Resume
和 WaitWhilePausedAsync
方法。唯一的要求是它能正常工作,并且不分配内存。
可以在此处找到我的工作人员-控制器场景的可运行在线演示。
Nito.AsyncEx.PauseTokenSource
类的输出:
Controller loops: 112,748
Worker loops: 84, paused: 36 times
我的
Pausation
类的输出:
Controller loops: 117,397
Unhandled exception. System.InvalidOperationException: Operation is not valid due to the current state of the object.
at System.Threading.Tasks.Sources.ManualResetValueTaskSourceCore`1.GetStatus(Int16 token)
at Program.Pausation.System.Threading.Tasks.Sources.IValueTaskSource.GetStatus(Int16 token)
at Program.<>c__DisplayClass0_0.<<Main>g__StartWorker|0>d.MoveNext()
我相信你误会了
Reset
。 ManualResetValueTaskSourceCore<T>.Reset
与ManualResetEvent.Reset
完全不同。对于ManualResetValueTaskSourceCore<T>
来说,Reset
的意思是“之前的操作已经完成,现在想复用value任务,所以换版本”。因此,它应该在 GetResult
之后调用(即,在暂停的代码完成等待之后),而不是在 Pause
内调用。封装此 IMO 的最简单方法是在返回 Reset
之前立即 ValueTask
状态。类似地,除非已返回值任务,否则您的代码不应调用 SetResult
。
ManualResetValueTaskSourceCore<T>
实际上是非常严格地围绕顺序操作设计的,并且拥有单独的“控制器”会使事情变得复杂。您可以通过跟踪消费者是否在等待并仅在有等待时才尝试完成来使其工作:public class Pausation : IValueTaskSource
{
private ManualResetValueTaskSourceCore<bool> _source;
private readonly object _mutex = new();
private bool _paused;
private bool _waiting;
public Pausation() => _source.RunContinuationsAsynchronously = true;
public void Pause()
{
lock (_mutex)
_paused = true;
}
public void Resume()
{
var wasWaiting = false;
lock (_mutex)
{
wasWaiting = _waiting;
_paused = _waiting = false;
}
if (wasWaiting)
_source.SetResult(default);
}
public ValueTask WaitWhilePausedAsync()
{
lock (_mutex)
{
if (!_paused) return ValueTask.CompletedTask;
_waiting = true;
_source.Reset();
return new ValueTask(this, _source.Version);
}
}
void IValueTaskSource.GetResult(short token)
{
_source.GetResult(token);
}
ValueTaskSourceStatus IValueTaskSource.GetStatus(short token)
{
return _source.GetStatus(token);
}
void IValueTaskSource.OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
{
_source.OnCompleted(continuation, state, token, flags);
}
}
我使用互斥体修复了一些明显的竞争条件,但我不能保证没有剩下更微妙的条件。至少它肯定会仅限于单个控制器和单个消费者。