当腌制中断Python多处理中时,我如何在平行任务上保存和调和可变的对象状态? 我们正在处理一个并发问题,其中单个“示例”对象具有必须分阶段执行的多个因任务。例如,阶段1具有任务(1A,1B),阶段2具有T ...

问题描述 投票:0回答:0
精心编排依赖任务的最佳实践,以便所有人 第1阶段的任务完成了,我可以开始2阶段2 跟踪所做的事情。

如何避免“丢失的可突出性”问题,每个过程都会修改副本,我需要将它们合并回去。是否有建议的模式或数据结构(例如多处理。经理或某种形式的共享内存) 更简单?

如何处理每个任务修改相同的方案 样本对象,但我们只希望最终的汇总结果 地方

贝洛是一个简化的代码示例。在实际代码中,每个任务都会修改示例的内部数据,但是一旦我们使用ProcessPoolExecutor,示例对象的参考就会变得断开连接的副本。

import concurrent.futures class Sample: def __init__(self, sample_id): self.sample_id = sample_id # For illustration, let's track stages like {'1': [False, False], '2': [False, False], ...} self.stage_completion = { '1': [False, False], '2': [False, False], '3': [False, False] } def do_task(self, stage, sub_idx): # Do some work here print(f"Doing {stage}{sub_idx} for sample {self.sample_id}") self.stage_completion[stage][sub_idx] = True return self # Return self for convenience def run_task(sample_obj, stage, sub_idx): return sample_obj.do_task(stage, sub_idx) def main(): sample = Sample(sample_id=123) with concurrent.futures.ProcessPoolExecutor() as executor: # Submit tasks 1a and 1b (equivalent to stage '1' indexes [0, 1]) future1 = executor.submit(run_task, sample, '1', 0) future2 = executor.submit(run_task, sample, '1', 1) # Wait for them to finish result1 = future1.result() result2 = future2.result() # Now I'd like to check if stage 1 is fully done before scheduling stage 2 # But result1 and result2 are separate copies with their own state # This is where merging states or having a centralized tracking is tricky print("Stage 1 results from result1:", result1.stage_completion) print("Stage 1 results from result2:", result2.stage_completion) if __name__ == "__main__": main()
  • 您可以看到,每个返回的示例对象可能对整体状态有部分视图。我更喜欢一种解决方案,可以使它们保持同步或轻松合并,而无需为每个内部数据结构编写手册“合并函数”。
  • 在Python中,推荐的设计模式或方法是什么,用于管理(并最终协调)在平行任务之间进行可变状态,以便我可以协调相关任务而不会失去共享对象的统一状态?提示,使用多处理,同意或更合适的库的示例将不胜感激。
  • 我们猜测最简单的方法是将对象存储在单独的数据库中 - 但是对该数据库的所有调用都可能使其慢...
  • 您可以使用
multiprocessing.manager

.

限制是,由于您使用的是代理,因此您需要为所有子对象添加getters和setters。
import concurrent.futures import multiprocessing.managers import copy class SharedSample: def __init__(self, sample_id): self.sample_id = sample_id # For illustration, let's track stages like {'1': [False, False], '2': [False, False], ...} self.stage_completion = { '1': [False, False], '2': [False, False], '3': [False, False] } def get_sample_id(self): return self.sample_id def set_stage_completion(self, stage, subidx, value): self.stage_completion[stage][subidx] = value # register the type to the manager multiprocessing.managers.BaseManager.register("SharedSample",SharedSample) # this cannot be a member method of SharedSample, otherwise it will run in the manager def do_task(sample, stage, sub_idx): # Do some work here print(f"Doing {stage}{sub_idx} for sample {sample.get_sample_id()}") sample.set_stage_completion(stage,sub_idx,True) return sample # Return self for convenience def run_task(sample_obj, stage, sub_idx): return do_task(sample_obj, stage, sub_idx) def main(): with multiprocessing.managers.BaseManager() as sampleManager: sample = sampleManager.SharedSample(sample_id=123) with concurrent.futures.ProcessPoolExecutor() as executor: # Submit tasks 1a and 1b (equivalent to stage '1' indexes [0, 1]) future1 = executor.submit(run_task, sample, '1', 0) future2 = executor.submit(run_task, sample, '1', 1) # Wait for them to finish result1 = future1.result() result2 = future2.result() # deepcopy just returns a copy from the manager print("Stage 1 results from result1:", copy.deepcopy(result1).stage_completion) print("Stage 1 results from result2:", copy.deepcopy(result2).stage_completion) if __name__ == "__main__": main()

python multiprocessing pickle python-multiprocessing python-multithreading
最新问题
© www.soinside.com 2019 - 2025. All rights reserved.