假设以下情况:
做到这一点的一种方法是将所有内容都推入
System.Threading.Channel<AnyOf<Event1, Event2>>
- 这满足了上面的所有要求..除了最后一个。据我所知,无法将事件 2 优先于项目 1。
一个可能更好的方法是将这些事件推送到单独的通道中,并从这两个通道中读取。
但是,最好的方法是什么?
粗略地说,我可以这样做:
class Example {
private readonly Channel<Event1> _channel1 = Channel.CreateBounded<Event1>(....);
private readonly Channel<Event2> _channel2 = Channel.CreateBounded<Event2>(....);
public async Task Worker(CancellationToken ct) {
while(!ct.IsCancellationRequested) {
var valueTask1 = _channel1.Reader.WaitToReadAsync(ct);
var valueTask2 = _channel2.Reader.WaitToReadAsync(ct);
// Waaait a moment.. this is wishful thinking! WhenAny for ValueTasks does not exist, at least not out-of-the-box
if(await ValueTaskExtensions.WhenAny(valueTask1, valueTask2) == 0)
{
// valueTask1 triggered
var element = await _channel1.Reader.ReadAsync(ct);
// Process
} else {
// valueTask2 triggered
var element = await _channel2.Reader.ReadAsync(ct);
// Process
}
}
}
}
需要注意的是,没有内置的
ValueTask.WhenAny()
。由于每个传入价格变动的分配,将这些变成适当的任务似乎非常浪费。
然而,有社区贡献用于
WhenAny
的实施,尽管我还没有在实践中使用这些,并且无法判断它们的质量和可能的警告。
有没有更聪明的方法可以从同一个读者线程的两个频道进行阅读,而不必离开
ValueTasks
的奇妙世界?
我想出了这种消费方法,它消耗两个通道,优先考虑第一个通道。只要第一个通道有一个元素可供消费,
ConsumeBothPrioritized
就会yield
这个元素,并且第二个通道不会产生任何元素。
public record struct AnyOf<T1, T2>(T1 Item1, T2 Item2, int Index);
/// <summary>
/// Consumes two channels, prioritizing the first channel over the second.
/// </summary>
public static async IAsyncEnumerable<AnyOf<T1, T2>> ConsumeBothPrioritized<T1, T2>(
ChannelReader<T1> channelReader1, ChannelReader<T2> channelReader2,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(channelReader1);
ArgumentNullException.ThrowIfNull(channelReader2);
cancellationToken.ThrowIfCancellationRequested();
Task<bool> task1 = null;
Task<bool> task2 = null;
while (true)
{
// Consume the first channel until it's empty.
while (channelReader1.TryRead(out T1 item1))
{
yield return new(item1, default, 1);
cancellationToken.ThrowIfCancellationRequested();
task1 = null;
}
// Consume just one item from the second channel.
bool channel2IsEmpty = true;
if (channelReader2.TryRead(out T2 item2))
{
yield return new(default, item2, 2);
cancellationToken.ThrowIfCancellationRequested();
channel2IsEmpty = false;
task2 = null;
}
if (channel2IsEmpty)
{
// Both channels are currently empty.
// Wait until one of the channels becomes non-empty.
task1 ??= channelReader1.WaitToReadAsync(cancellationToken).AsTask();
task2 ??= channelReader2.WaitToReadAsync(cancellationToken).AsTask();
Task completedTask;
if (task1.IsCompleted && !task1.Result)
{
// Ignore the first channel. It has already completed.
await task2.ConfigureAwait(false);
completedTask = task2;
}
else if (task2.IsCompleted && !task2.Result)
{
// Ignore the second channel. It has already completed.
await task1.ConfigureAwait(false);
completedTask = task1;
}
else
{
// Take into account both channels.
completedTask = await Task.WhenAny(task1, task2).ConfigureAwait(false);
}
if (task1.IsCompleted && !task1.Result && task2.IsCompleted && !task2.Result)
yield break; // Both channels have completed.
// Discard the task that we know it has completed,
// unless the associated channel completed as well.
if (completedTask == task1 && task1.Result) task1 = null;
if (completedTask == task2 && task2.Result) task2 = null;
}
}
}
WaitToReadAsync
任务量。可以通过减少对 IsCompleted
和 Result
的调用来进一步优化它,这些调用是 volatile
读取,因此稍微昂贵。
如果两个通道之一异常完成,
ConsumeBothPrioritized
将在另一个通道完成之前以 AggregateException
不正常地完成。基本上,渠道传播异常的场景并未经过深思熟虑。假设通道始终成功完成。
使用示例:
await foreach (var item in ConsumeBothPrioritized(channel1.Reader, channel2.Reader))
{
if (item.Index == 1)
{
// Do something with item.Item1
}
else if (item.Index == 2)
{
// Do something with item.Item2
}
}