如何同步返回未知顺序的异步过程的结果?

问题描述 投票:0回答:1

考虑一个昂贵的计算过程,无论它运行多少数据,它都可以很好地扩展,并且需要在交互式 MATLAB 环境中提供,以便随时生成新数据。如果在流程实例已经在进行时新数据到达,我想放弃该流程并重新启动所有现有数据,以便尽早获得迄今为止所有已知数据的结果。

但是,该过程的早期实例可能是由同步函数调用触发的,我仍然希望在替换过程完成后返回一个值。

下面的最小示例类重新创建了这种行为:它有一个将元素添加到存储数组的方法和一个主要工作方法,该方法在第一次执行缓慢操作后将迄今为止所有存储的数据简单地乘以

2
。使用
DoWork
异步调用工作方法
parfeval
,以便可以在不影响最终输出结果的情况下引导任何不完整的调用。
cancel

实例化类 
classdef ExampleClass < handle properties InputData OutputData end properties (Hidden, Transient) WorkFuture parallel.Future end methods function obj = ExampleClass() uibutton(uifigure, ButtonPushedFcn = @(~,evt) obj.AddData(evt.Source.Parent.CurrentPoint(1))); end function output = DoWork(obj) pause(2); drawnow output = obj.InputData * 2; end function val = StoreOutput(obj,val) obj.OutputData = val; end function F = AddData(obj,val) obj.WorkFuture.cancel; obj.InputData(end+1) = val; F = parfeval(parallel.Pool.empty,@obj.DoWork,1).afterAll(@obj.StoreOutput,1); obj.WorkFuture = F; end end end

会创建一个

h = ExampleClass;
,其中带有一个按钮,可将新数据(鼠标 X 坐标)输入到对象中。连续单击几次将始终导致将许多元素存储在
uifigure
属性中,并在最后一次单击后 2 秒内将相同数量的结果输出存储在
InputData
中。
现在假设有一个同步函数需要此过程的结果,其中包括输入值 

OutputData

的输出。如果它是进程的唯一客户端,则可以通过调用获得此结果:

n

这将根据需要返回。但是,如果在工作仍在完成时单击按钮添加更多输入数据,则从 
result = h.AddData(n).fetchOutputs;

返回的

Future
将被取消,因此结果是错误:
AddData

错误发生后不久,包含输入 
Error using parallel.Future/fetchOutputs One or more futures resulted in an error. Caused by: Execution of the future was cancelled.

输出的所需数据仍然最终存储在

n
中,但该信息无法到达串行进程
h
有没有办法定义一个具有相同预期效果(触发异步进程并最终在该进程完成时返回一个值)的同步调用,并且可以容忍取消它触发的初始异步进程?

这可能采取

result = h.AddData(n).fetchOutputs;

将责任“传递接力棒”给另一个人的形式,而不是简单地停止有效,或者通知串行代码可以以某种方式等待的事件,但这两个概念对我来说都没有明确的实现方法,因此任何能够完成这些事情或以第三种方式解决问题的解决方案都会受到欢迎。

    

matlab asynchronous
1个回答
0
投票
有没有办法定义一个具有相同预期效果(触发异步进程并最终在该进程完成时返回一个值)的同步调用,并且可以容忍取消它触发的初始异步进程?

这将通过
Future

的实现来确保同步调用等待最新异步进程的完成,即使它启动的原始进程已被取消。

这可以通过维护对最新 

ExampleClass 对象
的引用并使用循环来
等待计算完成
来实现: Future

 
classdef ExampleClass < handle properties InputData OutputData LatestFuture parallel.Future end methods function obj = ExampleClass() uibutton(uifigure, 'ButtonPushedFcn', @(~,evt) obj.AddData(evt.Source.Parent.CurrentPoint(1))); obj.LatestFuture = parallel.Future; end function output = DoWork(obj) pause(2); % Simulate a slow process output = obj.InputData * 2; end function StoreOutput(obj, val) obj.OutputData = val; end function F = AddData(obj, val) if ~isempty(obj.WorkFuture) && isvalid(obj.WorkFuture) cancel(obj.WorkFuture); end obj.InputData(end+1) = val; F = parfeval(@obj.DoWork, 1); obj.WorkFuture = F; obj.LatestFuture = F; end function output = SyncGetData(obj, val) F = obj.AddData(val); while ~strcmp(F.State, 'finished') pause(0.1); % Wait for the latest future to finish if ~isempty(obj.LatestFuture) && isvalid(obj.LatestFuture) F = obj.LatestFuture; end end output = fetchOutputs(F); end end end

属性跟踪最近的异步计算。

LatestFuture
方法被修改为每当新的计算开始时更新
AddData
。一种新方法
LatestFuture
会等待最新计算的结果,即使原始计算被取消。
SyncGetData

您现在应该使用 
+----------------------+ | SyncGetData | | +------------------+ | | | Call AddData |-----> Starts new async process | +------------------+ | | | Wait Loop |-----> Waits for the latest Future | +------------------+ | to complete | | Fetch Output |-----> Retrieves result from the latest | +------------------+ | completed process +----------------------+

,而不是直接调用

h.AddData(n).fetchOutputs;
来获取同步结果。
    

© www.soinside.com 2019 - 2024. All rights reserved.