我使用带有专用“包装器”(我们称之为
sw
)的 FEA 软件,通过以下方式提供“不透明”模型处理程序 model = sw.Load("myfile.ext")
。
为了跟踪计算进度,包装器允许我们以
def progressHandlerFn(model_name, progress)
的形式附加自定义“进度处理函数”,我们只需使用合成器将其分配给模型:model.progressHanlderFunction = progressHandlerFn
。
然后我们可以通过命令
model.Run()
运行模型。
在模拟过程中,我们的自定义函数被调用定期(由软件自动),我们可以进行基本操作,例如记录或打印进度。
为了处理多次运行,我使用
multiprocessing
包。在这种情况下,我希望能够使用进度条跟踪我的所有模拟。我尝试过几个套餐,分别是progress
、alive-progress
、rich
、tqdm
和atpbar
。他们似乎都在与 multiprocessing
引入的并行化作斗争。
主要的困难是:这些库中的大多数都使用合成器
with bar() as bar
,它与并行化不太匹配(据我所知)。
我能得到的最接近的是“手动”使用
tqdm
,它允许手动初始化和更新栏。但它仍然没有按预期工作(参见下面的代码)。
import sw
import multiprocessing
PARALLEL_RUNS = 2
names_to_bar = {}
def progressHandlerFn(model_name, progress):
if model_name not in names_to_bar.keys():
names_to_bar[model_name] = {"bar": tqdm(total=100), "last_progress": 0}
progress = 100 * (time - start) / (stop - start)
names_to_bar[model_name]["bar"].update(progress - names_to_bar[model_name]["last_progress"])
names_to_bar[model_name]["last_progress"] += 1
def run_simulation(file):
print(f"Loading and running {file}")
m = sw.Load(file)
m.progressHanlderFunction = progressHandlerFn
m.Run()
def run_simulations(files, parallel_runs):
with multiprocessing.Pool(parallel_runs) as pool:
pool.map(run_simulation, files)
if __name__ == "__main__":
run_simulations(model_files, PARALLEL_RUNS)
for pbar in names_to_bar.values():
pbar.close()
当我这样做时,我在命令行界面中获得一行,该行被第一个模型和第二个模型进程交替“擦除”。基本上,我希望看到两个独立的进度条,但我只得到一个。
顺便说一句,我后来注意到
names_to_bar
变量可能不会在池之间共享(因为 multiprocessing
就是这样工作的),但这实际上不应该成为问题,因为我认为每个池都会有自己的字典(只有一项)。
我的主要问题,正如您可能已经猜到的那样,是我没有手在
progressHandlerFn
参数上,所以我无法添加我在上面的级别中设置的变量...
我建议在主流程中管理所有进度条。
Enlighten 存储库中有一个示例,其中工作人员只需将计数放入队列中,主进程会在工作队列上循环以更新进度条。应该很容易适应您的用例。
https://github.com/Rockhopper-Technologies/enlighten/blob/main/examples/multiprocessing_queues.py
在此示例中,每个工作程序处理一个系统,系统数量在 10 到 20 个之间,但工作程序固定为 4 个。每个工作程序都有自己的进度条,并且有一个单独的进度条来跟踪总体进度。主进度条有3种颜色,黄色表示已开始,绿色表示已完成,红色表示错误。