我在 Python 中的多处理脚本中遇到问题。该脚本使用函数
process_one_flight
处理航班。单独来看,该函数的每个步骤都按预期工作,但是当通过多处理工作线程执行时,脚本有时会卡在 process_one_flight
函数的随机步骤上。我无法以一致的方式重现该错误,这使故障排除变得复杂.
这是脚本的简化版本:
def _worker(q: Queue, unprocessed: list):
while not q.empty():
flight = q.get()
try:
process_one_flight(flight, pair=pair, baseline=baseline, force=force)
except Exception as e:
unprocessed.append(flight)
print(f"Error processing {flight}: {e}")
manager = Manager()
q = manager.Queue()
unprocessed = manager.list()
for world in db_path.iterdir():
if "flight" not in world.name or world_name not in world.name:
continue
for split in world.iterdir():
flight_list = list(split.iterdir())
flight_idx = [int(flight.name) for flight in flight_list]
sorted_idx = np.argsort(flight_idx)
sorted_flights = [flight_list[i] for i in sorted_idx]
for flight in sorted_flights:
print(flight)
flight = Flight_lw(world=world.name, split=split.name, flight_name=flight.name)
flight.add_db_path(db_path)
flight.build_folders()
q.put(flight)
if num_workers > 1:
workers = [Process(target=_worker, args=(q, unprocessed)) for _ in range(num_workers)]
for worker in workers:
worker.start()
for worker in workers:
worker.join()
else:
_worker(q, unprocessed)
知道发生了什么或如何重现该错误吗?
竞争条件在这里
while not q.empty():
flight = q.get()
如果在当前线程检查
q.get()
后另一个线程设法 not q.empty()
队列的最后一项,则并发线程的 q.get()
将被阻塞:
https://docs.python.org/2/library/queue.html#Queue.Queue
如果可选参数块为 true 并且超时为 None(默认值),则根据需要进行阻止,直到有项目可用为止。
您需要重新考虑如何读取队列以避免竞争条件,我想说,做类似的事情:
while True:
try:
flight = q.get(block=true, timeout=1)
except Queue.Empty:
break
try:
process_one_flight(flight, pair=pair, baseline=baseline, force=force)
...
或者,您可以选择只
,而不是抓住
Queue.Empty
并打破它
while True:
flight = q.get(block=true, timeout=1)
try:
process_one_flight(flight, pair=pair, baseline=baseline, force=force)
...
这将有效地停止工作线程,尽管根据您的日志配置记录了异常。