所以我们让这个类实现
IValueTaskSource
这段代码不能写成 async-await 因为没有什么可以等待的。我们向另一个正在运行的线程发送一条消息,并返回一个 ValueTask,调用者可以等待该 ValueTask 以获取另一个线程已完成处理该消息的通知。然而,另一个线程是预先存在的并且已经在工作。它通过完全不同的方式接收消息;处理消息,然后需要告诉线程池源线程它已完成。因此; IValueTaskSource
没有库存
ValueTaskSource
(不讨论是否应该有;但是在这种情况下,库存版本的实用性值得怀疑)。我们实际拥有的看起来非常像这样:
class Message : IValueTaskSource {
public ValueTask Send()
{
/* how the message is sent is irrelevant */
return new ValueTask(this, 0);
}
private Action<object> continuation;
private object continuationState;
void IValueTaskSource.OnCompleted(Action<object> continuation, object state, short _, ValueTaskSourceOnCompletedFlags __)
{
lock(this) {
if (GetStatus(_) == ValueTaskSourceStatus.Pending)
{
this.continuation = continuation;
this.continuationState = state;
return;
}
continuation(continuationState); /* Suspect */
}
}
public void SetCompleted()
{
lock (this)
{
/* set state completed omitted for brevity */
continuation?.Invoke(continuationState); /* Suspect */
}
}
}
我想我做错了。想象一下这样一个大链条;看起来它会建立太多的堆栈。特别是,标记为
/* Suspect */
的线正是如此;并且 ValueTaskSourceOnCompletionFlags
未使用。尽管它确实有一个好处,那就是 continuation
抛出的异常总是会发生在某个地方;假设这确实是一个问题。
现在,代码可以工作,因为只有三个,并且使用它们的延续与它们所在的线程无关。
根据Stephen Cleary提供的ManualResetValueTaskSource的链接和相应的源代码我能够给出答案。
ManualResetValueTaskSourceCore<T>
提供了IValueTaskSource<T>
和IValueTaskSource<T>
的完整实现。目前的情况是没有 void
实现,因此使用虚拟类型创建一个 void 实现。关于 bool
或 object
是否是最好的虚拟类型存在一些普遍的争论,但我认为这并不重要,因为 T
的成员填充无论如何都会强制对齐。
所以答案是转发所有方法。
public ValueTask Send()
{
/* how the message is sent is irrelevant */
return CraeteValueTask();
}
private ManualResetValueTaskSourceCore<object> taskSource;
private ValueTask CreateValueTask() => new ValueTask(this, taskSource.Version);
public ValueTaskSourceStatus GetStatus(short version) => taskSource.GetStatus(version);
public void OnCompleted(Action<object> continuation, object state, short version, ValueTaskSourceOnCompletedFlags flags) => taskSource.OnCompleted(continuation, state, version, flags);
public void SetCompleted() => taskSource.SetResult(null);
在这种情况下,每条消息都在自己的对象中,因此没有池化。没关系。调用现有的实现比尝试写下最小的正确实现要容易得多,因此它仍然是更好的方法。
我很确定如果我正在汇集价值任务源,正确的方法是在
Reset()
中调用 CreateValueTask()
。
INotifyCompletion
接口来获取通知的示例,而不是更重的 IValueTaskSource
+ValueTask
机制。 Message
类仅用一个额外的实例字段(即 Action
)进行了修改,并且通过公开 GetAwaiter
方法使其变得可等待。每个 Message
实例仅等待一次。
public class Message : INotifyCompletion
{
private static readonly Action _completedSentinel = new(() => { });
private Action _continuation;
public Message GetAwaiter() { return this; }
public bool IsCompleted
=> ReferenceEquals(Volatile.Read(ref _continuation), _completedSentinel);
public void OnCompleted(Action continuation)
{
Action original = Interlocked.CompareExchange(ref _continuation,
continuation, null);
if (original is null) return; // Normal case
if (ReferenceEquals(original, _completedSentinel))
continuation(); // Rare case
else
throw new InvalidOperationException("Double await");
}
public void GetResult() { }
public void SetCompleted()
{
Action continuation = Interlocked.Exchange(ref _continuation,
_completedSentinel);
if (continuation is null) return;
ThreadPool.QueueUserWorkItem(state => ((Action)state).Invoke(), continuation);
}
}
在线演示.
静态
_completedSentinel
字段用于解决 await
所在的线程与调用 SetCompleted
方法的线程之间可能发生的竞争条件。通常,await
会首先发生,但如果它发生在SetCompleted
之后,或者即使在SetCompleted
/IsCompleted
调用之间调用OnCompleted
(这些由异步/等待机制)。