如何避免“丢失的可突出性”问题,每个过程都会修改副本,我需要将它们合并回去。是否有建议的模式或数据结构(例如多处理。经理或某种形式的共享内存) 更简单?
如何处理每个任务修改相同的方案 样本对象,但我们只希望最终的汇总结果 地方
贝洛是一个简化的代码示例。在实际代码中,每个任务都会修改示例的内部数据,但是一旦我们使用ProcessPoolExecutor,示例对象的参考就会变得断开连接的副本。
import concurrent.futures
class Sample:
def __init__(self, sample_id):
self.sample_id = sample_id
# For illustration, let's track stages like {'1': [False, False], '2': [False, False], ...}
self.stage_completion = {
'1': [False, False],
'2': [False, False],
'3': [False, False]
}
def do_task(self, stage, sub_idx):
# Do some work here
print(f"Doing {stage}{sub_idx} for sample {self.sample_id}")
self.stage_completion[stage][sub_idx] = True
return self # Return self for convenience
def run_task(sample_obj, stage, sub_idx):
return sample_obj.do_task(stage, sub_idx)
def main():
sample = Sample(sample_id=123)
with concurrent.futures.ProcessPoolExecutor() as executor:
# Submit tasks 1a and 1b (equivalent to stage '1' indexes [0, 1])
future1 = executor.submit(run_task, sample, '1', 0)
future2 = executor.submit(run_task, sample, '1', 1)
# Wait for them to finish
result1 = future1.result()
result2 = future2.result()
# Now I'd like to check if stage 1 is fully done before scheduling stage 2
# But result1 and result2 are separate copies with their own state
# This is where merging states or having a centralized tracking is tricky
print("Stage 1 results from result1:", result1.stage_completion)
print("Stage 1 results from result2:", result2.stage_completion)
if __name__ == "__main__":
main()
.
。限制是,由于您使用的是代理,因此您需要为所有子对象添加getters和setters。
import concurrent.futures
import multiprocessing.managers
import copy
class SharedSample:
def __init__(self, sample_id):
self.sample_id = sample_id
# For illustration, let's track stages like {'1': [False, False], '2': [False, False], ...}
self.stage_completion = {
'1': [False, False],
'2': [False, False],
'3': [False, False]
}
def get_sample_id(self):
return self.sample_id
def set_stage_completion(self, stage, subidx, value):
self.stage_completion[stage][subidx] = value
# register the type to the manager
multiprocessing.managers.BaseManager.register("SharedSample",SharedSample)
# this cannot be a member method of SharedSample, otherwise it will run in the manager
def do_task(sample, stage, sub_idx):
# Do some work here
print(f"Doing {stage}{sub_idx} for sample {sample.get_sample_id()}")
sample.set_stage_completion(stage,sub_idx,True)
return sample # Return self for convenience
def run_task(sample_obj, stage, sub_idx):
return do_task(sample_obj, stage, sub_idx)
def main():
with multiprocessing.managers.BaseManager() as sampleManager:
sample = sampleManager.SharedSample(sample_id=123)
with concurrent.futures.ProcessPoolExecutor() as executor:
# Submit tasks 1a and 1b (equivalent to stage '1' indexes [0, 1])
future1 = executor.submit(run_task, sample, '1', 0)
future2 = executor.submit(run_task, sample, '1', 1)
# Wait for them to finish
result1 = future1.result()
result2 = future2.result()
# deepcopy just returns a copy from the manager
print("Stage 1 results from result1:", copy.deepcopy(result1).stage_completion)
print("Stage 1 results from result2:", copy.deepcopy(result2).stage_completion)
if __name__ == "__main__":
main()