从同一线程高效读取两个不同的System.Threading.Channels

问题描述 投票:0回答:1

假设以下情况:

  • 我有两个不同的事件流
  • 流 1 是基于拉动的快速移动事件流(想想金融工具价格)
  • 流 2 的移动速度要慢得多(根据这些价格考虑要采取的行动)
  • 两个流必须在一个联合工作线程/任务上处理
  • 实施必须尽可能高效
  • 流 2 应优先/在流 1 之前处理

做到这一点的一种方法是将所有内容都推入

System.Threading.Channel<AnyOf<Event1, Event2>>
- 这满足了上面的所有要求..除了最后一个。据我所知,无法将事件 2 优先于项目 1。

一个可能更好的方法是将这些事件推送到单独的通道中,并从这两个通道中读取。

但是,最好的方法是什么?

粗略地说,我可以这样做:

class Example {
  private readonly Channel<Event1> _channel1 = Channel.CreateBounded<Event1>(....);
  private readonly Channel<Event2> _channel2 = Channel.CreateBounded<Event2>(....);

  public async Task Worker(CancellationToken ct) {
    while(!ct.IsCancellationRequested) {
      var valueTask1 = _channel1.Reader.WaitToReadAsync(ct);
      var valueTask2 = _channel2.Reader.WaitToReadAsync(ct);
      
      // Waaait a moment.. this is wishful thinking! WhenAny for ValueTasks does not exist, at least not out-of-the-box
      if(await ValueTaskExtensions.WhenAny(valueTask1, valueTask2) == 0)
      {
        // valueTask1 triggered
        var element = await _channel1.Reader.ReadAsync(ct);
        // Process
      } else {
        // valueTask2 triggered
        var element = await _channel2.Reader.ReadAsync(ct);
        // Process
      }
    }
  }

}

需要注意的是,没有内置的

ValueTask.WhenAny()
。由于每个传入价格变动的分配,将这些变成适当的任务似乎非常浪费。

然而,有社区贡献用于

WhenAny
的实施,尽管我还没有在实践中使用这些,并且无法判断它们的质量和可能的警告。

有没有更聪明的方法可以从同一个读者线程的两个频道进行阅读,而不必离开

ValueTasks
的奇妙世界?

c# concurrency producer-consumer
1个回答
0
投票

我想出了这种消费方法,它消耗两个通道,优先考虑第一个通道。只要第一个通道有一个元素可供消费,

ConsumeBothPrioritized
就会
yield
这个元素,并且第二个通道不会产生任何元素。

public record struct AnyOf<T1, T2>(T1 Item1, T2 Item2, int Index);

/// <summary>
/// Consumes two channels, prioritizing the first channel over the second.
/// </summary>
public static async IAsyncEnumerable<AnyOf<T1, T2>> ConsumeBothPrioritized<T1, T2>(
    ChannelReader<T1> channelReader1, ChannelReader<T2> channelReader2,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    ArgumentNullException.ThrowIfNull(channelReader1);
    ArgumentNullException.ThrowIfNull(channelReader2);
    cancellationToken.ThrowIfCancellationRequested();
    Task<bool> task1 = null;
    Task<bool> task2 = null;
    while (true)
    {
        // Consume the first channel until it's empty.
        while (channelReader1.TryRead(out T1 item1))
        {
            yield return new(item1, default, 1);
            cancellationToken.ThrowIfCancellationRequested();
            task1 = null;
        }

        // Consume just one item from the second channel.
        bool channel2IsEmpty = true;
        if (channelReader2.TryRead(out T2 item2))
        {
            yield return new(default, item2, 2);
            cancellationToken.ThrowIfCancellationRequested();
            channel2IsEmpty = false;
            task2 = null;
        }

        if (channel2IsEmpty)
        {
            // Both channels are currently empty.
            // Wait until one of the channels becomes non-empty.
            task1 ??= channelReader1.WaitToReadAsync(cancellationToken).AsTask();
            task2 ??= channelReader2.WaitToReadAsync(cancellationToken).AsTask();
            Task completedTask;
            if (task1.IsCompleted && !task1.Result)
            {
                // Ignore the first channel. It has already completed.
                await task2.ConfigureAwait(false);
                completedTask = task2;
            }
            else if (task2.IsCompleted && !task2.Result)
            {
                // Ignore the second channel. It has already completed.
                await task1.ConfigureAwait(false);
                completedTask = task1;
            }
            else
            {
                // Take into account both channels.
                completedTask = await Task.WhenAny(task1, task2).ConfigureAwait(false);
            }

            if (task1.IsCompleted && !task1.Result && task2.IsCompleted && !task2.Result)
                yield break; // Both channels have completed.

            // Discard the task that we know it has completed,
            // unless the associated channel completed as well.
            if (completedTask == task1 && task1.Result) task1 = null;
            if (completedTask == task2 && task2.Result) task2 = null;
        }
    }
}

此方法最大限度地减少了其创建的

WaitToReadAsync
任务量。可以通过减少对
IsCompleted
Result
的调用来进一步优化它,这些调用是
volatile
读取,因此稍微昂贵。

如果两个通道之一异常完成,

ConsumeBothPrioritized
将在另一个通道完成之前以
AggregateException
不正常地完成。基本上,渠道传播异常的场景并未经过深思熟虑。假设通道始终成功完成。

使用示例:

await foreach (var item in ConsumeBothPrioritized(channel1.Reader, channel2.Reader))
{
    if (item.Index == 1)
    {
        // Do something with item.Item1
    }
    else if (item.Index == 2)
    {
        // Do something with item.Item2
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.