使用 Dataflow CTP(在 TPL 中)
如果当前排队或推迟的项目数量小于 BatchSize,超时后是否可以自动调用 BatchBlock.TriggerBatch?
更好的是:每次块接收到新项目时,此超时应重置为 0。
是的,您可以通过将块链接在一起来相当优雅地完成此任务。在这种情况下,您需要设置一个 TransformBlock,将其链接到 BatchBlock“之前”。看起来像这样:
Timer triggerBatchTimer = new Timer(() => yourBatchBlock.TriggerBatch());
TransformBlock<T, T> timeoutTransformBlock = new TransformBlock<T, T>((value) =>
{
triggerBatchTimer.Change(5000, Timeout.Infinite);
return value;
});
timeoutTransformBlock.LinkTo(yourBatchBlock);
yourBufferBlock.LinkTo(timeoutTransformBlock);
这是德鲁·马什想法的监管版本。此实现使用
DataflowBlock.Encapsulate
方法创建封装计时器+批处理功能的数据流块。除了新参数 timeout
之外,CreateBatchBlock
方法还支持普通 BatchBlock
构造函数可用的所有选项。
public static IPropagatorBlock<T, T[]> CreateBatchBlock<T>(int batchSize,
int timeout, GroupingDataflowBlockOptions dataflowBlockOptions = null)
{
dataflowBlockOptions = dataflowBlockOptions ?? new GroupingDataflowBlockOptions();
var batchBlock = new BatchBlock<T>(batchSize, dataflowBlockOptions);
var timer = new System.Threading.Timer(_ => batchBlock.TriggerBatch());
var transformBlock = new TransformBlock<T, T>((T value) =>
{
timer.Change(timeout, Timeout.Infinite);
return value;
}, new ExecutionDataflowBlockOptions()
{
BoundedCapacity = dataflowBlockOptions.BoundedCapacity,
CancellationToken = dataflowBlockOptions.CancellationToken,
EnsureOrdered = dataflowBlockOptions.EnsureOrdered,
MaxMessagesPerTask = dataflowBlockOptions.MaxMessagesPerTask,
NameFormat = dataflowBlockOptions.NameFormat,
TaskScheduler = dataflowBlockOptions.TaskScheduler
});
transformBlock.LinkTo(batchBlock, new DataflowLinkOptions()
{
PropagateCompletion = true
});
return DataflowBlock.Encapsulate(transformBlock, batchBlock);
}
替代方案:下面是一个
BatchUntilInactiveBlock<T>
类,提供全部 BatchBlock<T>
功能。此实现是围绕 BatchBlock<T>
实例的薄包装。与之前的 CreateBatchBlock
实现相比,它的开销更少,但具有类似的行为。
/// <summary>
/// Provides a dataflow block that batches inputs into arrays.
/// A batch is produced when the number of currently queued items becomes equal
/// to BatchSize, or when a Timeout period has elapsed after receiving the last item.
/// </summary>
public class BatchUntilInactiveBlock<T> : IPropagatorBlock<T, T[]>,
IReceivableSourceBlock<T[]>
{
private readonly BatchBlock<T> _source;
private readonly Timer _timer;
private readonly TimeSpan _timeout;
public BatchUntilInactiveBlock(int batchSize, TimeSpan timeout,
GroupingDataflowBlockOptions dataflowBlockOptions)
{
_source = new BatchBlock<T>(batchSize, dataflowBlockOptions);
_timer = new Timer(_ => _source.TriggerBatch());
_timeout = timeout;
}
public BatchUntilInactiveBlock(int batchSize, TimeSpan timeout) : this(batchSize,
timeout, new GroupingDataflowBlockOptions())
{ }
public int BatchSize => _source.BatchSize;
public TimeSpan Timeout => _timeout;
public Task Completion => _source.Completion;
public int OutputCount => _source.OutputCount;
public void Complete() => _source.Complete();
void IDataflowBlock.Fault(Exception exception)
=> ((IDataflowBlock)_source).Fault(exception);
public IDisposable LinkTo(ITargetBlock<T[]> target,
DataflowLinkOptions linkOptions)
=> _source.LinkTo(target, linkOptions);
public void TriggerBatch() => _source.TriggerBatch();
public bool TryReceive(Predicate<T[]> filter, out T[] item)
=> _source.TryReceive(filter, out item);
public bool TryReceiveAll(out IList<T[]> items)
=> _source.TryReceiveAll(out items);
DataflowMessageStatus ITargetBlock<T>.OfferMessage(
DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source,
bool consumeToAccept)
{
var offerResult = ((ITargetBlock<T>)_source).OfferMessage(messageHeader,
messageValue, source, consumeToAccept);
if (offerResult == DataflowMessageStatus.Accepted)
_timer.Change(_timeout, System.Threading.Timeout.InfiniteTimeSpan);
return offerResult;
}
T[] ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader,
ITargetBlock<T[]> target, out bool messageConsumed)
=> ((ISourceBlock<T[]>)_source).ConsumeMessage(messageHeader,
target, out messageConsumed);
bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader,
ITargetBlock<T[]> target)
=> ((ISourceBlock<T[]>)_source).ReserveMessage(messageHeader, target);
void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader,
ITargetBlock<T[]> target)
=> ((ISourceBlock<T[]>)_source).ReleaseReservation(messageHeader, target);
}
免责声明:上述实现的行为并不理想,因为即使在不应该的情况下,它们也会产生短批次。理想的行为是仅在批次可以立即传播到下游消费者的情况下才生成短批次。生产短批次并将它们存储在块的输出缓冲区中没有多大意义。仅当
CreateBatchBlock<T>
/BatchUntilInactiveBlock<T>
未严格泵送时,才能观察到与理想行为的偏差,例如,如果下游链接块有界并已达到其最大容量。
感谢 Drew Marsh 提出使用 TransformBlock 的想法,这对我最近的解决方案有很大帮助。 但是,我认为计时器需要在批处理块之后重置(即,在达到批处理大小或在计时器回调中显式调用 TriggerBatch 方法触发计时器之后)。 如果每次获得单个项目时都重置计时器,那么它可能会不断重置多次,而根本不会真正触发批次(不断将计时器上的“dueTime”推得更远)。
这将使代码片段如下所示:
Timer triggerBatchTimer = new Timer(() => yourBatchBlock.TriggerBatch(), null, 5000, Timeout.Infinite);
TransformBlock<T[], T[]> timeoutTransformBlock = new TransformBlock<T[], T[]>((value) =>
{
triggerBatchTimer.Change(5000, Timeout.Infinite);
return value;
});
yourBufferBlock.LinkTo(yourBatchBlock);
yourBatchBlock.LinkTo(timeoutTransformBlock)
timeoutTransformBlock.LinkTo(yourActionBlock);
// Start the producer which is populating the BufferBlock etc.
这是一个基于之前答案的解决方案。此方法封装现有的
BatchBlock
,其中至少与 timeout
一样频繁地推出批次。
如果触发计时器时没有批处理块,则其他答案不会处理。在这种情况下,其他解决方案将等待批次满。我们在非生产环境中遇到了这个问题,这使得测试变得更加困难。此解决方案确保将项目发布到
BatchBlock
后,最多在 timeout
秒后传播。
public static IPropagatorBlock<T, T[]> CreateTimeoutBatchBlock<T>(BatchBlock<T> batchBlock, int timeout)
{
var timespan = TimeSpan.FromSeconds(timeout);
var timer = new Timer(
_ => batchBlock.TriggerBatch(),
null,
timespan,
timespan);
var transformBlock = new TransformBlock<T[], T[]>(
value =>
{
// Reset the timer when a batch has been triggered
timer.Change(timespan, timespan);
return value;
});
batchBlock.LinkTo(transformBlock, new DataflowLinkOptions { PropagateCompletion = true });
return DataflowBlock.Encapsulate(batchBlock, transformBlock);
}
您可以使用链接选项
_transformManyBlock.LinkTo(_batchBlock, new DataflowLinkOptions {PropagateCompletion = true});