代码将在处理最后一个结果集之前执行:
var rxQueryTimeOut = pollingPeriodInSeconds + dbQueryTimeoutInSeconds;
var scheduler = new EventLoopScheduler(ts => new Thread(ts) {Name = "DatabasePoller"});
query = Observable.Interval(rxQueryTimeOut, scheduler).ObserveOn(SynchronizationContext.Current)
.Select(_ => dbQuery.LoadItemsFromDB(LastFetchedRecord)).Do((x) => DoSomething(x, itemsMgr))
.Retry() //Loop on errors
.Repeat(); //Loop on success
query.Subscribe();
private void DoSomething(List<ItemDBData> theoDataDbs, IItemsMgr itemsMgr)
{
foreach(var theoData in theoDataDbs)
{
}
}
DoSomething
完成执行之前,Observable.Interval
再次触发。
我尝试过:
private void DoSomething(List<ItemDBData> items, IItemsMgr itemsMgr)
{
if(items.Count> 0)
LastFetchedRecord = items[items.Count-1].TimeStamp;
foreach(var itemData in items)
{
}
}
但是我认为每个间隔都会丢弃最后一个查询请求,因为我正在丢失数据。我该如何解决?
您是否尝试每X秒处理数据库表中的新行?
如果在计时器用完之前无法处理一个查询中的所有行,它将丢失数据。
没有更多细节,很难看到问题。
考虑处理数据库查询返回的行时要执行多少阻塞操作。一个常见的问题是取回许多行,然后对其进行foreach循环,并对每行数据进行一次或多次数据库往返。