python 工作人员随机卡住

问题描述 投票:0回答:1

我在 Python 中的多处理脚本中遇到问题。该脚本使用函数

process_one_flight
处理航班。单独来看,该函数的每个步骤都按预期工作,但是当通过多处理工作线程执行时,脚本有时会卡在
process_one_flight
函数的随机步骤上。我无法以一致的方式重现该错误,这使故障排除变得复杂.

这是脚本的简化版本:

def _worker(q: Queue, unprocessed: list):
    while not q.empty():
        flight = q.get()
        try:
            process_one_flight(flight, pair=pair, baseline=baseline, force=force)
        except Exception as e:
            unprocessed.append(flight)
            print(f"Error processing {flight}: {e}")

manager = Manager()
q = manager.Queue()
unprocessed = manager.list()

for world in db_path.iterdir():
    if "flight" not in world.name or world_name not in world.name:
        continue
    for split in world.iterdir():
        flight_list = list(split.iterdir())
        flight_idx = [int(flight.name) for flight in flight_list]
        sorted_idx = np.argsort(flight_idx)
        sorted_flights = [flight_list[i] for i in sorted_idx]
        for flight in sorted_flights:
            print(flight)
            flight = Flight_lw(world=world.name, split=split.name, flight_name=flight.name)
            flight.add_db_path(db_path)
            flight.build_folders()
            q.put(flight)

if num_workers > 1:
    workers = [Process(target=_worker, args=(q, unprocessed)) for _ in range(num_workers)]
    for worker in workers:
        worker.start()
    for worker in workers:
        worker.join()
else:
    _worker(q, unprocessed)

知道发生了什么或如何重现该错误吗?

python multiprocessing
1个回答
0
投票

竞争条件在这里

    while not q.empty():
        flight = q.get()

如果在当前线程检查

q.get()
后另一个线程设法
not q.empty()
队列的最后一项,则并发线程的
q.get()
将被阻塞:

https://docs.python.org/2/library/queue.html#Queue.Queue

如果可选参数块为 true 并且超时为 None(默认值),则根据需要进行阻止,直到有项目可用为止。

您需要重新考虑如何读取队列以避免竞争条件,我想说,做类似的事情:

while True:
    try:
        flight = q.get(block=true, timeout=1)
    except Queue.Empty:
        break

    try:    
        process_one_flight(flight, pair=pair, baseline=baseline, force=force)
   ...

或者,您可以选择只

,而不是抓住 
Queue.Empty

并打破它
while True:
    flight = q.get(block=true, timeout=1)

    try:    
        process_one_flight(flight, pair=pair, baseline=baseline, force=force)
   ...

这将有效地停止工作线程,尽管根据您的日志配置记录了异常。

© www.soinside.com 2019 - 2024. All rights reserved.