实现 ValueTaskSource.SetCompleted 的正确方法是什么

问题描述 投票:0回答:2

所以我们让这个类实现

IValueTaskSource
这段代码不能写成 async-await 因为没有什么可以等待的。我们向另一个正在运行的线程发送一条消息,并返回一个 ValueTask,调用者可以等待该 ValueTask 以获取另一个线程已完成处理该消息的通知。然而,另一个线程是预先存在的并且已经在工作。它通过完全不同的方式接收消息;处理消息,然后需要告诉线程池源线程它已完成。因此;
IValueTaskSource

没有库存

ValueTaskSource
(不讨论是否应该有;但是在这种情况下,库存版本的实用性值得怀疑)。我们实际拥有的看起来非常像这样:

class Message : IValueTaskSource {
    public ValueTask Send()
    {
        /* how the message is sent is irrelevant */
        return new ValueTask(this, 0);
    }

    private Action<object> continuation;
    private object continuationState;

    void IValueTaskSource.OnCompleted(Action<object> continuation, object state, short _, ValueTaskSourceOnCompletedFlags __)
    {
         lock(this) {
              if (GetStatus(_) == ValueTaskSourceStatus.Pending)
              {
                  this.continuation = continuation;
                  this.continuationState = state;
                  return;
              }
              continuation(continuationState); /* Suspect */
         }
    }

    public void SetCompleted()
    {
        lock (this)
        {
             /* set state completed omitted for brevity */
             continuation?.Invoke(continuationState); /* Suspect */             
        }
    }
}

我想我做错了。想象一下这样一个大链条;看起来它会建立太多的堆栈。特别是,标记为

/* Suspect */
的线正是如此;并且
ValueTaskSourceOnCompletionFlags
未使用。尽管它确实有一个好处,那就是
continuation
抛出的异常总是会发生在某个地方;假设这确实是一个问题。

现在,代码可以工作,因为只有三个,并且使用它们的延续与它们所在的线程无关。

c# asynchronous .net-core valuetask ivaluetasksource
2个回答
2
投票

根据Stephen Cleary提供的ManualResetValueTaskSource的链接和相应的源代码我能够给出答案。

ManualResetValueTaskSourceCore<T>
提供了
IValueTaskSource<T>
IValueTaskSource<T>
的完整实现。目前的情况是没有
void
实现,因此使用虚拟类型创建一个 void 实现。关于
bool
object
是否是最好的虚拟类型存在一些普遍的争论,但我认为这并不重要,因为
T
的成员填充无论如何都会强制对齐。

所以答案是转发所有方法。

    public ValueTask Send()
    {
        /* how the message is sent is irrelevant */
        return CraeteValueTask();
    }

    private ManualResetValueTaskSourceCore<object> taskSource;

    private ValueTask CreateValueTask() => new ValueTask(this, taskSource.Version);
    public ValueTaskSourceStatus GetStatus(short version) => taskSource.GetStatus(version);
    public void OnCompleted(Action<object> continuation, object state, short version, ValueTaskSourceOnCompletedFlags flags) => taskSource.OnCompleted(continuation, state, version, flags);
    public void SetCompleted() => taskSource.SetResult(null);

在这种情况下,每条消息都在自己的对象中,因此没有池化。没关系。调用现有的实现比尝试写下最小的正确实现要容易得多,因此它仍然是更好的方法。

我很确定如果我正在汇集价值任务源,正确的方法是在

Reset()
中调用
CreateValueTask()


1
投票

这里是一个使用

INotifyCompletion
接口来获取通知的示例,而不是更重的
IValueTaskSource
+
ValueTask
机制。
Message
类仅用一个额外的实例字段(即
Action
)进行了修改,并且通过公开
GetAwaiter
方法使其变得可等待。每个
Message
实例仅等待一次。

public class Message : INotifyCompletion
{
    private static readonly Action _completedSentinel = new(() => { });
    private Action _continuation;

    public Message GetAwaiter() { return this; }
    public bool IsCompleted
        => ReferenceEquals(Volatile.Read(ref _continuation), _completedSentinel);
    public void OnCompleted(Action continuation)
    {
        Action original = Interlocked.CompareExchange(ref _continuation,
            continuation, null);
        if (original is null) return; // Normal case
        if (ReferenceEquals(original, _completedSentinel))
            continuation(); // Rare case
        else
            throw new InvalidOperationException("Double await");
    }
    public void GetResult() { }

    public void SetCompleted()
    {
        Action continuation = Interlocked.Exchange(ref _continuation,
            _completedSentinel);
        if (continuation is null) return;
        ThreadPool.QueueUserWorkItem(state => ((Action)state).Invoke(), continuation);
    }
}

在线演示.

静态

_completedSentinel
字段用于解决
await
所在的线程与调用
SetCompleted
方法的线程之间可能发生的竞争条件。通常,
await
会首先发生,但如果它发生在
SetCompleted
之后,或者即使在
SetCompleted
/
IsCompleted
调用之间调用
OnCompleted
(这些由异步/等待机制)。

© www.soinside.com 2019 - 2024. All rights reserved.