同步等待 TaskCompletionSource.Task

问题描述 投票:0回答:2

异步同步是不好的。我知道。但是调用

TaskCompletionSource.Task.Wait()
的时候会不会出现sync-over-async的问题?如果
TaskCompletionSource
是用
TaskCreationOptions.RunContinuationsAsynchronously
创建的,答案会改变吗?

更新
回答评论中的问题。并非所有

Task
都是平等的。
Task
对象早在
async/await
之前就已引入,并用于并行编程。例如,以下代码没有任何问题,因为它不执行任何异步工作。

var task = Task.Run(() => Thread.Sleep(10_000));
task.Wait();

对于上下文:Kafka客户端有一个同步方法来生成消息,它接受一个动作来报告传递状态异步

void Produce(
      TopicPartition topicPartition,
      Message<TKey, TValue> message,
      Action<DeliveryReport<TKey, TValue>> deliveryHandler = null);

在少数情况下,我需要等待交付报告才能继续工作,可以在同步或异步上下文中。为此,我有以下课程:

internal class DeliveryReportAwaiter<TKey, TValue> : IDisposable
{
    private const int WaitForDeliveryGracePeriodFactor = 2;
    private readonly int _waitDeliveryReportTimeoutMs;
    private readonly ILogger _logger;
    private readonly CancellationTokenSource _cancellationTokenSource;
    private readonly TaskCompletionSource _taskCompletionSource;

    private bool _disposed;

    public DeliveryReportAwaiter(int waitDeliveryReportTimeoutMs, ILogger logger)
    {
        _logger = logger;
        _waitDeliveryReportTimeoutMs = waitDeliveryReportTimeoutMs *
            WaitForDeliveryGracePeriodFactor;

        _taskCompletionSource = new TaskCompletionSource(
            TaskCreationOptions.RunContinuationsAsynchronously);
        _cancellationTokenSource = new CancellationTokenSource();
        // in case OnDeliveryReportReceived was never called
        _cancellationTokenSource.Token.Register(SetTaskTimeoutException);
    }

    public void WaitForDeliveryReport(CancellationToken token)
    {
        token.ThrowIfCancellationRequested();
        _cancellationTokenSource.CancelAfter(_waitDeliveryReportTimeoutMs);
        
        // Is this considered sync-over-async?
        _taskCompletionSource.Task.Wait(token);
    }

    public Task WaitForDeliveryReportAsync(CancellationToken token)
    {
        token.ThrowIfCancellationRequested();
        _cancellationTokenSource.CancelAfter(_waitDeliveryReportTimeoutMs);
        return _taskCompletionSource.Task.WaitAsync(token);
    }

    public void OnDeliveryReportReceived(DeliveryReport<TKey, TValue> deliveryReport,
        Action<DeliveryReport<TKey, TValue>> handleReportAction)
    {
        if (_disposed)
        {
            _logger.LogWarning(
                "The delivery report for the message {Key} on topic {Topic} arrived " +
                    "after the awaiter was disposed due to timeout of cancellation. " +
                    "The delivery status is {Status}",
                deliveryReport.Key,
                deliveryReport.Topic,
                deliveryReport.Status);

            return;
        }

        if (!_cancellationTokenSource.TryReset())
        {
            SetTaskTimeoutException();
        }
        else
        {
            handleReportAction?.Invoke(deliveryReport);
            _taskCompletionSource.TrySetResult();
        }
    }

    public void Dispose()
    {
        if (_disposed)
        {
            return;
        }

        _disposed = true;
        _cancellationTokenSource.Dispose();
    }

    private void SetTaskTimeoutException()
    {
        var errorMessage = $"Producer timed out while waiting for publish " +
            $"confirm for {_waitDeliveryReportTimeoutMs}ms!";
        _taskCompletionSource.TrySetException(new KafkaTimeoutException(errorMessage));
    }
}

参见

WaitForDeliveryReport
方法实现。
现在问题要长得多,但我希望它能帮助人们理解其背后的原因。

c# asynchronous task
2个回答
0
投票

据我所知,您的冗长问题就是这样:

public void WaitForDeliveryReport(CancellationToken token)
{
    token.ThrowIfCancellationRequested();
    _cancellationTokenSource.CancelAfter(_waitDeliveryReportTimeoutMs);
    
    // Is this considered sync-over-async?
    _taskCompletionSource.Task.Wait(token);
}

答案是肯定的,当然是。你有一个理智的 API,你把它变成一个随机线程(可能是 GUI,如果你像这样编写代码)绝对没有理由。

编辑:

在少数情况下,我需要等待交付报告才能继续工作

C# 中有一个内置关键字:

await
。你不需要你的课程或异步同步伪造。


0
投票

使用

TaskCompletionSource
 选项初始化 
TaskCreationOptions.RunContinuationsAsynchronously
意味着
Task
将在
ThreadPool
上完成,而不是在调用
TrySetException
方法的线程上完成。这在
Task
await
ed 的情况下很有用,这样完成线程就不会被
await
之后的延续劫持。在调用
SetResult
后让您的线程“被盗”是异步编程中错误的常见原因。这是大多数程序员非常意外的事情,在他们第一次体验它之前(通常是在最后一个漫长而令人不愉快的调试会话)。

但是如果

Task
是同步
Wait
的,在
ThreadPool
上完成它没有任何优势。
Wait
之后的延续将在等待线程上运行,而不是在完成线程上运行。完成线程所要做的就是发出内部
ManualResetEventSlim
信号,这是可以忽略不计的工作量。

所以你应该避免用这个选项初始化

TaskCompletionSource
,如果你打算同步
Wait
它。这样做不仅无缘无故地增加了开销,而且还有延迟完成任务的风险,以防
ThreadPool
饱和。在极端情况下,甚至可能发生完全死锁,以防
ThreadPool
饱和并达到其最大尺寸

© www.soinside.com 2019 - 2024. All rights reserved.