我想编写一个评估两个传感器的传感器数据的应用程序。两个传感器都在Package
对象中发送数据,这些对象被拆分为Frame
对象。 Package
本质上是Tuple<Timestamp, Data[]>
,Frame
是Tuple<Timestamp, Data>
。然后,我需要始终使用两个来源中带有最早时间戳的Frame
。
所以基本上我的对象流是
Package -(1:n)-> Frame \
}-pair synchronized-> Tuple<Frame, Frame>
Package -(1:n)-> Frame /
假设每个Package
包含2个或3个值(真实度:5-7)和以1递增的整数时间戳(真实度:〜200Hz =>〜5ms增量)。为了简单起见,“数据”仅为timestamp * 100
。
Packages (timestamp, values[])
Source 1:
{(19, [1700, 1800, 1900]), (22, [2000, 2100, 2200]), (26, [2500, 2600]),
(29, [2700, 2800, 2900]), ...}
Source 2:
{(17, [1500, 1600, 1700]), (19, [1800, 1900]), (21, [2000, 2100]),
(26, [2400, 2500, 2600]), ...}
(1:n)
步骤之后:
Frames (timestamp, value)
Source 1:
{(17, 1700), (18, 1800), (19, 1900), (20, 2000), (21, 2100),
(22, 2200), (25, 2500), (26, 2600), (27, 2700), (28, 2800),
(29, 2900), ...}
Source 2:
{(15, 1500), (16, 1600), (17, 1700), (18, 1800), (19, 1900),
(20, 2000), (21, 2100), (24, 2400), (25, 2500), (26, 2600), ...}
pair synchronized
步骤之后:
Merged tuples (timestamp, source1, source2)
{(15, null, 1500), (16, null, 1600), (17, 1700, 1700), (18, 1800, 1800),
(19, 1900, 1900), (20, 2000, 2000), (21, 2100, 2100), (22, 2200, null),
(24, null, 2400), (25, 2500, 2500), (26, 2600, 2600), ...}
请注意,由于两个来源的无发送了一个值,因此缺少时间戳23
。那只是副作用。我可以放一个空的元组,没关系。元组是(27, 2700, 2700)
还是((27, 2700), (27, 2700))
也无所谓,i。 e。 Tuple<Timestamp, Data, Data>
或Tuple<Frame, Frame>
。
我很确定,如果我没看错文档,(1:n)
部分应该是TransformManyBlock<Package, Frame>
。
但是我要用哪个块TransformManyBlock<Package, Frame>
part?起初,我以为pair synchronized
是我要寻找的东西,但看起来它只是将两个元素索引配对了-明智的。但是,由于既不能确保两个流水线都以相同的时间戳开始,也不能确保两个流水线都将始终产生稳定的连续时间戳流(因为有时传输时会丢失几帧的数据包),所以这不是一种选择。因此,我需要的更多是“ MergeBlock”,并可以决定将两个输入流的哪个元素接下来传播到输出(如果有)。
我想我自己必须写类似的东西。但是我很难编写能正确处理两个ISourceBlock变量和一个ITargetBlock变量的代码。我基本上早就被卡住了:
JoinBlock<Frame, Frame>
我甚至对此草稿也不确定:方法应该是JoinBlock<Frame, Frame>
,所以我可以使用private void MergeSynchronized(
ISourceBlock<Frame> source1,
ISourceBlock<Frame> source2,
ITargetBlock<Tuple<Frame, Frame>> target)
{
var frame1 = source1.Receive();
var frame2 = source2.Receive();
//Loop {
// Depending on the timestamp [mis]match,
// either pair frame1+frame2 or frame1+null or null+frame2, and
// replace whichever frame(s) was/were propagated already
// with the next frame from the respective pipeline
//}
}
吗?循环的条件是什么?在哪里以及如何检查完成情况?如何解决明显的问题,即我的代码意味着我必须等到流中的间隙over才能意识到存在间隙?]]
我考虑过的替代方法是在管道中添加一个额外的块,确保每个传感器将足够的“前哨帧”放入管道中,以便始终将每个管道中的第一个对齐,将正确的两个对齐。我guess
是一种async
,它读取一个Frame,将“预期”时间戳与实际时间戳进行比较,然后为缺少的时间戳插入前哨帧,直到该帧的时间戳再次正确。 >或者var frame1 = await source1.ReceiveAsnyc();
部分是在TPL Dataflow对象处停止并开始已经在TransformManyBlock
部分工作的实际代码的地方?
我想编写一个评估两个传感器的传感器数据的应用程序。两个传感器都将其数据发送到Package对象中,Package对象将被拆分为Frame对象。包本质上是一个元组<...>
我知道使用反射不是一个好的解决方案。但是,TPL DataFlow API的主要问题是,一切都是内部/私有和密封的。这使您无法进行任何扩展。
无论如何,如果您遇到问题,您都需要手工实施或使用肮脏的骇客,如: