在处理下一个查询之前批处理数据库结果

问题描述 投票:0回答:1

代码将在处理最后一个结果集之前执行:

var rxQueryTimeOut = pollingPeriodInSeconds + dbQueryTimeoutInSeconds;
var scheduler = new EventLoopScheduler(ts => new Thread(ts) {Name = "DatabasePoller"});
query = Observable.Interval(rxQueryTimeOut, scheduler).ObserveOn(SynchronizationContext.Current)
    .Select(_ => dbQuery.LoadItemsFromDB(LastFetchedRecord)).Do((x) => DoSomething(x, itemsMgr))
    .Retry() //Loop on errors
    .Repeat(); //Loop on success

query.Subscribe();

private void DoSomething(List<ItemDBData> theoDataDbs, IItemsMgr itemsMgr)
{
    foreach(var theoData in theoDataDbs)
    {

    }
}

DoSomething完成执行之前,Observable.Interval再次触发。

我尝试过:

private void DoSomething(List<ItemDBData> items, IItemsMgr itemsMgr)
{
    if(items.Count> 0)
        LastFetchedRecord = items[items.Count-1].TimeStamp;

    foreach(var itemData in items)
    {

    }
}

但是我认为每个间隔都会丢弃最后一个查询请求,因为我正在丢失数据。我该如何解决?

c# system.reactive
1个回答
0
投票

您是否尝试每X秒处理数据库表中的新行?

如果在计时器用完之前无法处理一个查询中的所有行,它将丢失数据。

没有更多细节,很难看到问题。

考虑处理数据库查询返回的行时要执行多少阻塞操作。一个常见的问题是取回许多行,然后对其进行foreach循环,并对每行数据进行一次或多次数据库往返。

© www.soinside.com 2019 - 2024. All rights reserved.