考虑一个昂贵的计算过程,无论它运行多少数据,它都可以很好地扩展,并且需要在交互式 MATLAB 环境中提供,以便随时生成新数据。如果在流程实例已经在进行时新数据到达,我想放弃该流程并重新启动所有现有数据,以便尽早获得迄今为止所有已知数据的结果。
但是,该过程的早期实例可能是由同步函数调用触发的,我仍然希望在替换过程完成后返回一个值。
下面的最小示例类重新创建了这种行为:它有一个将元素添加到存储数组的方法和一个主要工作方法,该方法在第一次执行缓慢操作后将迄今为止所有存储的数据简单地乘以
2
。使用 DoWork
异步调用工作方法 parfeval
,以便可以在不影响最终输出结果的情况下引导任何不完整的调用。cancel
实例化类
classdef ExampleClass < handle
properties
InputData
OutputData
end
properties (Hidden, Transient)
WorkFuture parallel.Future
end
methods
function obj = ExampleClass()
uibutton(uifigure, ButtonPushedFcn = @(~,evt) obj.AddData(evt.Source.Parent.CurrentPoint(1)));
end
function output = DoWork(obj)
pause(2);
drawnow
output = obj.InputData * 2;
end
function val = StoreOutput(obj,val)
obj.OutputData = val;
end
function F = AddData(obj,val)
obj.WorkFuture.cancel;
obj.InputData(end+1) = val;
F = parfeval(parallel.Pool.empty,@obj.DoWork,1).afterAll(@obj.StoreOutput,1);
obj.WorkFuture = F;
end
end
end
会创建一个
h = ExampleClass;
,其中带有一个按钮,可将新数据(鼠标 X 坐标)输入到对象中。连续单击几次将始终导致将许多元素存储在 uifigure
属性中,并在最后一次单击后 2 秒内将相同数量的结果输出存储在 InputData
中。现在假设有一个同步函数需要此过程的结果,其中包括输入值 OutputData
的输出。如果它是进程的唯一客户端,则可以通过调用获得此结果:
n
这将根据需要返回。但是,如果在工作仍在完成时单击按钮添加更多输入数据,则从
result = h.AddData(n).fetchOutputs;
返回的
Future
将被取消,因此结果是错误:AddData
错误发生后不久,包含输入
Error using parallel.Future/fetchOutputs
One or more futures resulted in an error.
Caused by:
Execution of the future was cancelled.
输出的所需数据仍然最终存储在
n
中,但该信息无法到达串行进程 h
。有没有办法定义一个具有相同预期效果(触发异步进程并最终在该进程完成时返回一个值)的同步调用,并且可以容忍取消它触发的初始异步进程?
这可能采取
result = h.AddData(n).fetchOutputs;
将责任“传递接力棒”给另一个人的形式,而不是简单地停止有效,或者通知串行代码可以以某种方式等待的事件,但这两个概念对我来说都没有明确的实现方法,因此任何能够完成这些事情或以第三种方式解决问题的解决方案都会受到欢迎。
这将通过
Future
的实现来确保同步调用等待最新异步进程的完成,即使它启动的原始进程已被取消。
这可以通过维护对最新
ExampleClass
对象的引用并使用循环来等待计算完成来实现:
Future
classdef ExampleClass < handle
properties
InputData
OutputData
LatestFuture parallel.Future
end
methods
function obj = ExampleClass()
uibutton(uifigure, 'ButtonPushedFcn', @(~,evt) obj.AddData(evt.Source.Parent.CurrentPoint(1)));
obj.LatestFuture = parallel.Future;
end
function output = DoWork(obj)
pause(2); % Simulate a slow process
output = obj.InputData * 2;
end
function StoreOutput(obj, val)
obj.OutputData = val;
end
function F = AddData(obj, val)
if ~isempty(obj.WorkFuture) && isvalid(obj.WorkFuture)
cancel(obj.WorkFuture);
end
obj.InputData(end+1) = val;
F = parfeval(@obj.DoWork, 1);
obj.WorkFuture = F;
obj.LatestFuture = F;
end
function output = SyncGetData(obj, val)
F = obj.AddData(val);
while ~strcmp(F.State, 'finished')
pause(0.1); % Wait for the latest future to finish
if ~isempty(obj.LatestFuture) && isvalid(obj.LatestFuture)
F = obj.LatestFuture;
end
end
output = fetchOutputs(F);
end
end
end
属性跟踪最近的异步计算。
LatestFuture
方法被修改为每当新的计算开始时更新AddData
。一种新方法 LatestFuture
会等待最新计算的结果,即使原始计算被取消。SyncGetData
您现在应该使用
+----------------------+
| SyncGetData |
| +------------------+ |
| | Call AddData |-----> Starts new async process
| +------------------+ |
| | Wait Loop |-----> Waits for the latest Future
| +------------------+ | to complete
| | Fetch Output |-----> Retrieves result from the latest
| +------------------+ | completed process
+----------------------+
,而不是直接调用
h.AddData(n).fetchOutputs;
来获取同步结果。