异步同步是不好的。我知道。但是调用
TaskCompletionSource.Task.Wait()
的时候会不会出现sync-over-async的问题?如果TaskCompletionSource
是用TaskCreationOptions.RunContinuationsAsynchronously
创建的,答案会改变吗?
更新
回答评论中的问题。并非所有
Task
都是平等的。 Task
对象早在 async/await
之前就已引入,并用于并行编程。例如,以下代码没有任何问题,因为它不执行任何异步工作。
var task = Task.Run(() => Thread.Sleep(10_000));
task.Wait();
对于上下文:Kafka客户端有一个同步方法来生成消息,它接受一个动作来报告传递状态异步
void Produce(
TopicPartition topicPartition,
Message<TKey, TValue> message,
Action<DeliveryReport<TKey, TValue>> deliveryHandler = null);
在少数情况下,我需要等待交付报告才能继续工作,可以在同步或异步上下文中。为此,我有以下课程:
internal class DeliveryReportAwaiter<TKey, TValue> : IDisposable
{
private const int WaitForDeliveryGracePeriodFactor = 2;
private readonly int _waitDeliveryReportTimeoutMs;
private readonly ILogger _logger;
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly TaskCompletionSource _taskCompletionSource;
private bool _disposed;
public DeliveryReportAwaiter(int waitDeliveryReportTimeoutMs, ILogger logger)
{
_logger = logger;
_waitDeliveryReportTimeoutMs = waitDeliveryReportTimeoutMs *
WaitForDeliveryGracePeriodFactor;
_taskCompletionSource = new TaskCompletionSource(
TaskCreationOptions.RunContinuationsAsynchronously);
_cancellationTokenSource = new CancellationTokenSource();
// in case OnDeliveryReportReceived was never called
_cancellationTokenSource.Token.Register(SetTaskTimeoutException);
}
public void WaitForDeliveryReport(CancellationToken token)
{
token.ThrowIfCancellationRequested();
_cancellationTokenSource.CancelAfter(_waitDeliveryReportTimeoutMs);
// Is this considered sync-over-async?
_taskCompletionSource.Task.Wait(token);
}
public Task WaitForDeliveryReportAsync(CancellationToken token)
{
token.ThrowIfCancellationRequested();
_cancellationTokenSource.CancelAfter(_waitDeliveryReportTimeoutMs);
return _taskCompletionSource.Task.WaitAsync(token);
}
public void OnDeliveryReportReceived(DeliveryReport<TKey, TValue> deliveryReport,
Action<DeliveryReport<TKey, TValue>> handleReportAction)
{
if (_disposed)
{
_logger.LogWarning(
"The delivery report for the message {Key} on topic {Topic} arrived " +
"after the awaiter was disposed due to timeout of cancellation. " +
"The delivery status is {Status}",
deliveryReport.Key,
deliveryReport.Topic,
deliveryReport.Status);
return;
}
if (!_cancellationTokenSource.TryReset())
{
SetTaskTimeoutException();
}
else
{
handleReportAction?.Invoke(deliveryReport);
_taskCompletionSource.TrySetResult();
}
}
public void Dispose()
{
if (_disposed)
{
return;
}
_disposed = true;
_cancellationTokenSource.Dispose();
}
private void SetTaskTimeoutException()
{
var errorMessage = $"Producer timed out while waiting for publish " +
$"confirm for {_waitDeliveryReportTimeoutMs}ms!";
_taskCompletionSource.TrySetException(new KafkaTimeoutException(errorMessage));
}
}
参见
WaitForDeliveryReport
方法实现。据我所知,您的冗长问题就是这样:
public void WaitForDeliveryReport(CancellationToken token)
{
token.ThrowIfCancellationRequested();
_cancellationTokenSource.CancelAfter(_waitDeliveryReportTimeoutMs);
// Is this considered sync-over-async?
_taskCompletionSource.Task.Wait(token);
}
答案是肯定的,当然是。你有一个理智的 API,你把它变成一个随机线程(可能是 GUI,如果你像这样编写代码)绝对没有理由。
编辑:
在少数情况下,我需要等待交付报告才能继续工作
C# 中有一个内置关键字:
await
。你不需要你的课程或异步同步伪造。
使用
TaskCompletionSource
选项初始化
TaskCreationOptions.RunContinuationsAsynchronously
意味着 Task
将在 ThreadPool
上完成,而不是在调用 TrySetException
方法的线程上完成。这在 Task
被 await
ed 的情况下很有用,这样完成线程就不会被 await
之后的延续劫持。在调用 SetResult
后让您的线程“被盗”是异步编程中错误的常见原因。这是大多数程序员非常意外的事情,在他们第一次体验它之前(通常是在最后一个漫长而令人不愉快的调试会话)。
但是如果
Task
是同步Wait
的,在ThreadPool
上完成它没有任何优势。 Wait
之后的延续将在等待线程上运行,而不是在完成线程上运行。完成线程所要做的就是发出内部ManualResetEventSlim
信号,这是可以忽略不计的工作量。
所以你应该避免用这个选项初始化
TaskCompletionSource
,如果你打算同步Wait
它。这样做不仅无缘无故地增加了开销,而且还有延迟完成任务的风险,以防ThreadPool
饱和。在极端情况下,甚至可能发生完全死锁,以防ThreadPool
饱和并达到其最大尺寸。