我正在连接到一个网络服务,该服务为我提供一天的所有价格(没有时间信息)。每个价格结果都有对应的“批量运行”的ID。
“批生产”具有日期和时间戳,但我必须单独致电以获取当天的所有批生产信息。
因此,要获得每个结果的实际时间,我需要将两个API调用结合在一起。
我为此使用了Reactive,但是我无法可靠地合并两组数据。我以为CombineLatest
可以做到这一点,但是它似乎并没有按照我的想法工作(基于http://reactivex.io/documentation/operators/combinelatest.html,http://introtorx.com/Content/v1.0.10621.0/12_CombiningSequences.html#CombineLatest)。
[TestMethod]
public async Task EvenMoreBasicCombineLatestTest()
{
int batchStart = 100, batchCount = 10;
//create 10 results with batch ids [100, 109]
//the test uses lists just to make debugging easier
var resultsWithBatchIdList = Enumerable.Range(batchStart, batchCount)
.Select(id => new { BatchRunId = id, ResultValue = id * 10 })
.ToList();
var resultsWithBatchId = Observable.ToObservable(resultsWithBatchIdList);
Assert.AreEqual(batchCount, await resultsWithBatchId.Count());
//create 10 batches with ids [100, 109]
var batchesList = Enumerable.Range(batchStart, batchCount)
.Select(id => new
{
ThisId = id,
BatchName = String.Concat("abcd", id)
})
.ToList();
var batchesObservable = Observable.ToObservable(batchesList);
Assert.AreEqual(batchCount, await batchesObservable.Count());
//turn the batch set into a dictionary so we can look up each batch by its id
var batchRunsByIdObservable = batchesObservable.ToDictionary(batch => batch.ThisId);
//for each result, look up the corresponding batch id in the dictionary to join them together
var resultsWithCorrespondingBatch =
batchRunsByIdObservable
.CombineLatest(resultsWithBatchId, (batchRunsById, result) =>
{
Assert.AreEqual(NumberOfResultsToCreate, batchRunsById.Count);
var correspondingBatch = batchRunsById[result.BatchRunId];
var priceResultAndSourceBatch = new
{
Result = result,
SourceBatchRun = correspondingBatch
};
return priceResultAndSourceBatch;
});
Assert.AreEqual(batchCount, await resultsWithCorrespondingBatch.Count());
}
我希望随着'results'可观察结果的每个元素的出现,它将与可观察到的batch-id词典的每个元素结合在一起(后者只有一个元素)。但是,相反,似乎只有结果列表的最后一个元素才被加入。
我由此产生了一个更复杂的问题,但是在尝试创建最小再现时,即使这样做也会给我带来意想不到的结果。版本3.1.1、4.0.0、4.2.0等会发生这种情况。
((请注意,这些序列通常不像此人工示例中那样匹配,因此我不能仅将它们Zip
匹配。)
所以我该如何加入?我想通过Dictionary查找更多信息的结果流(也来自Observable)?
还要注意,目标是返回IObservable(resultsWithCorrespondingBatch),所以我不能只是await
batchRunsByIdObservable。
好吧,我想我明白了。我希望文档中的两个大理石图中的任何一个都稍有不同-会使CombineLatest
的细微之处更加明显:
N------1---2---3---
L--z--a------bc----
R------1---2-223---
a a bcc
它是结合在一起的[[最新”-因此,根据发出项目的时间,可能会丢失一些元组。我应该做的是SelectMany
:
.CombineLatest(resultsWithBatchId, (batchRunsById, result) =>
是:.SelectMany(batchRunsById => resultsWithBatchId.Select(result =>