我有执行以下操作的 rx 管道 -
最后一步仅运行一次。我期待无限奔跑。 我在这里缺少什么?
var observable = Observable
.Interval(TimeSpan.FromSeconds(10))
.Scan(seed, NextPeriod)
.SelectMany(period =>
{
Console.WriteLine();
Console.WriteLine("fetching events for");
Console.WriteLine(period.Begin.ToString("yyyy-MM-dd hh:mm:ss.ffff"));
Console.WriteLine(period.End.ToString("yyyy-MM-dd hh:mm:ss.ffff"));
var events = FetchEvents(period, eventDataProvider);
Console.WriteLine($"{events.Count} events fetched");
return events;
})
.GroupBy(x => x.Channel).Subscribe(group =>
{
Console.WriteLine($"Channel - {group.Key}");
});
控制台输出
events generated
fetching events for
2024-05-15 03:16:00.0000
2024-05-15 03:16:59.0000
40 events fetched
Channel - group-1
Channel - group-2
Channel - group-3
fetching events for
2024-05-15 03:17:00.0000
2024-05-15 03:17:59.0000
1475 events fetched
fetching events for
2024-05-15 03:18:00.0000
2024-05-15 03:18:59.0000
1475 events fetched
这是您正在做的事情的独立版本:
IDisposable subscription =
Observable
.Interval(TimeSpan.FromSeconds(1))
.Take(3)
.SelectMany(x => new[]
{
new { Channel = "a", Value = x, },
new { Channel = "b", Value = x, },
new { Channel = "c", Value = x, },
})
.Do(x => Console.WriteLine($"{x.Channel}, {x.Value}"))
.GroupBy(x => x.Channel)
.Subscribe(group =>
{
Console.WriteLine($"Channel - {group.Key}");
});
SelectMany
正在产生这个价值流:
a, 0
b, 0
c, 0
a, 1
b, 1
c, 1
a, 2
b, 2
c, 2
现在
GroupBy
按 Channel
分组并生成 IObservable<IGroupedObservable<...>>
。这是一个嵌套的可观察对象。非常有效IObservable<IObservable<...>>
。
第一次看到每个通道时,它会订阅 outer 可观察对象,并输出通道的键 ,用于它使用该键看到的第一个可观察对象。你没有对内部可观察值做任何事情 - 所以内部可观察值返回一个值多少次并不重要 - 你只是还没有订阅它。