我正在使用反应式库基于符号报价(外汇和差价合约)进行 1 秒柱聚合
到目前为止,IAsyncEnumerable 扩展还没有成功,所以我决定用推送策略替换拉取策略,其中反应式与操作符一起发挥作用。不幸的是我不熟悉它并陷入了另一个问题。不管怎样,我已经非常接近最终结果了。
我想要实现的是根据同一秒的“TickTime”按符号报价进行分组。我面临的问题是报价时间 - 来自经纪商端的 TickTime 和接收时间 - 来自通道的 ReceivedTime 之间的延迟(延迟约为 10 到 30 毫秒)。我尝试将延迟运算符与缓冲区(时间跨度持续时间)结合使用,但我想这不是正确的选择。我知道我应该使用窗口操作符,但我不知道如何使用,因为反应式方法对我来说是新的东西,我仍在学习。
注意事项: 如果 1 秒内没有出现报价,则可观察的应该发布空列表(某种超时),并且在我的情况下不可能进行后续更新。
这是我的短代码
await Task.Run(() => {
var asset = symbol;
var observable = streamQuotes.ToObservable()
.Delay(TimeSpan.FromMilliseconds(15))
.Buffer(TimeSpan.FromSeconds(1), Scheduler.Default)
.Subscribe(quotes =>
{
var topic = Topic.ToObject($"{provider}-{asset}-{Topic.SECOND}-1");
if (DebugMode)
Logger.LogInformation(
$"{topic} {JsonConvert.SerializeObject(quotes, Formatting.Indented)}");
//_ = AggregateAsync(topic, quotes, epochBegin, epochStep++), cancellationToken);
});
cancellationToken.Register(() => observable.Dispose());
Subscriptions.Add(observable);
}, cancellationToken);
产生以下输出
{
"Id": 142,
"Bid": 45282.72,
"Ask": 45341.63,
"Symbol": "BTCUSD",
"Time": 1707431630102,
"TickTime": "2024-02-08T22:33:50.102+00:00",
"ReceivedTime": "2024-02-08T22:33:50.1187799+00:00"
},
{
"Id": 142,
"Bid": 45283.05,
"Ask": 45341.96,
"Symbol": "BTCUSD",
"Time": 1707431630552,
"TickTime": "2024-02-08T22:33:50.552+00:00",
"ReceivedTime": "2024-02-08T22:33:50.5654973+00:00"
},
{
"Id": 142,
"Bid": 45283.86,
"Ask": 45342.77,
"Symbol": "BTCUSD",
"Time": 1707431630591,
"TickTime": "2024-02-08T22:33:50.591+00:00",
"ReceivedTime": "2024-02-08T22:33:50.6046997+00:00"
},
{
"Id": 142,
"Bid": 45285.07,
"Ask": 45343.98,
"Symbol": "BTCUSD",
"Time": 1707431630702,
"TickTime": "2024-02-08T22:33:50.702+00:00",
"ReceivedTime": "2024-02-08T22:33:50.7155601+00:00"
},
{
"Id": 142,
"Bid": 45284.73,
"Ask": 45343.64,
"Symbol": "BTCUSD",
"Time": 1707431630753,
"TickTime": "2024-02-08T22:33:50.753+00:00",
"ReceivedTime": "2024-02-08T22:33:50.7658716+00:00"
},
{
"Id": 142,
"Bid": 45284.45,
"Ask": 45343.36,
"Symbol": "BTCUSD",
"Time": 1707431630853,
"TickTime": "2024-02-08T22:33:50.853+00:00",
"ReceivedTime": "2024-02-08T22:33:50.866821+00:00"
},
{
"Id": 142,
"Bid": 45284.54,
"Ask": 45343.46,
"Symbol": "BTCUSD",
"Time": 1707431631003,
"TickTime": "2024-02-08T22:33:50.999+00:00",
"ReceivedTime": "2024-02-08T22:33:51.005728+00:00"
},
{
"Id": 142,
"Bid": 45284.64,
"Ask": 45343.55,
"Symbol": "BTCUSD",
"Time": 1707431631040,
"TickTime": "2024-02-08T22:33:51.03+00:00",
"ReceivedTime": "2024-02-08T22:33:51.0530009+00:00"
},
{
"Id": 142,
"Bid": 45284.64,
"Ask": 45343.55,
"Symbol": "BTCUSD",
"Time": 1707431631040,
"TickTime": "2024-02-08T22:33:51.05+00:00",
"ReceivedTime": "2024-02-08T22:33:51.056112+00:00"
}
输出错误,因为列表中的最后 2 个对象属于下一秒。
请帮助我解决这个问题,或者至少为我指出正确的方向。
由于延迟和缓冲区依赖于平均精度为 15 毫秒的系统计时器,因此您的缓冲将始终在每秒之间滑动。
您需要依赖绝对事件时间。请看下面我的示例,它不依赖于计时器,而仅依赖于绝对时间。
static async Task Main()
{
var rnd = new Random();
var quotes = Observable
.Interval(TimeSpan.FromMilliseconds(300))
.Select(_ => rnd.NextDouble());
var sub = quotes
.Timestamp()
.Do(x => Console.WriteLine($"{x.Timestamp:mm:ss.fff}: {x.Value:N}"))
.GroupByUntil(
q => q.Timestamp.DateTime.AddSeconds(1).ToString("s"),
g => Observable.Return(Unit.Default).DelaySubscription(DateTime.Parse(g.Key, null, DateTimeStyles.AssumeUniversal)))
.Select(group => group.ToList())
.Concat()
.Subscribe(list =>
{
if(!list.Any()) return;
Console.WriteLine($"{list.First().Timestamp:HH:mm:ss.fff}-{list.Last().Timestamp:ss:fff}: Count: {list.Count}, Open: {list.First().Value:N}, High: {list.Max(x=>x.Value):N}, Low: {list.Min(x=>x.Value):N}, Close: {list.Last().Value:N}");
});
Console.ReadLine();
sub.Dispose();
}
如有需要解释,请随时询问。
这可能应该为您解决问题,或者至少引导您走上正确的道路:
var observable = streamQuotes.ToObservable()
.GroupBy(q => q.TickTime.Ticks / TimeSpan.TicksPerSecond)
.SelectMany(g => g.Buffer(TimeSpan.FromSeconds(1), Scheduler.Default))
.Merge()
.Subscribe(quotes =>
{
Console.Write(JsonConvert.SerializeObject(quotes, Newtonsoft.Json.Formatting.Indented));
});
您的代码存在一些问题:
TickTime
属性的秒 value进行缓冲/分组。您调用的
Buffer
运算符(以及 Window
运算符)将在代码运行和数据到达时按秒偏移量进行缓冲。它根本不查看您的数据。解决方案中的
GroupBy
首先按基于刻度的第二个值对数据进行分组。 Buffer
然后将它们形成一个列表。