重新进入运行 python 多处理脚本的 tmux 会话时管道损坏

问题描述 投票:0回答:1

我有一个长时间运行的 python 脚本,需要在大学实验室的服务器上执行。我首先启动一个 tmux 会话,而不是直接在 SSH 客户端中执行并冒着连接中断从而导致进程被终止的风险:

tmux new -s preprocessing

并在会话中运行我的脚本:

python3 path/to/my/scirpt.py

之后,我退出会话并关闭 ssh 连接。 一段时间后,我想检查进度,因此我通过 SSH 重新连接到服务器并附加到 tmux 会话:

tmux attach -t preprocessing

我首先在终端中看到脚本的进度 - 它有一个像这样的进度条:

Processing 1st Step:   7%|███▌                                                 | 61725/921600 [11:11:49<155:59:00,  1.53it/s]

但是几秒钟后我收到此错误:

multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
  File "/usr/lib/python3.10/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib/python3.10/multiprocessing/pool.py", line 51, in starmapstar
    return list(itertools.starmap(args[0], args[1]))
  File "/home/plodir2/network-predictive-analysis/scripts/data_preprocessing_MILC+LAMMPS+UR_lammps_1056.py", line 419, in process_chunk
    if (application_iteration, router_id) not in list(processed_routers):
  File "<string>", line 2, in __len__
  File "/usr/lib/python3.10/multiprocessing/managers.py", line 817, in _callmethod
    conn.send((self._id, methodname, args, kwds))
  File "/usr/lib/python3.10/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/lib/python3.10/multiprocessing/connection.py", line 411, in _send_bytes
    self._send(header + buf)
  File "/usr/lib/python3.10/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/plodir2/network-predictive-analysis/scripts/data_preprocessing_MILC+LAMMPS+UR_lammps_1056.py", line 600, in <module>
    pool.starmap(process_chunk, [(application_chunk, data_to_add_list, processed_routers, inactive_nodes_lists_processed_iterations) for application_chunk in application_chunks])
  File "/usr/lib/python3.10/multiprocessing/pool.py", line 375, in starmap
    return self._map_async(func, iterable, starmapstar, chunksize).get()
  File "/usr/lib/python3.10/multiprocessing/pool.py", line 774, in get
    raise self._value
BrokenPipeError: [Errno 32] Broken pipe

我认为这与 tmux 会话有关,因为我在附加到它后就得到了它。此外,进度条上显示的 11 小时与我启动脚本和检查其脚本时所经过的时间精确匹配进步。此外,我已经使用较小的数据集运行了脚本,并且它处理它没有错误(在 SSH 中运行它,同时保持连接活动)。

我可以提供有关该脚本的更多详细信息,但它本质上是为了处理 pandas 数据帧,

application_data
将其分成块,每个块都由不同的进程处理。这样做是为了在大量数据的情况下提高性能。

...
if __name__ == "__main__":
    num_processes = 400
    application_chunks = np.array_split(application_data, num_processes)


    with Manager() as manager:
        data_to_add_list = manager.list()
        processed_routers = manager.list()
        inactive_nodes_lists_processed_iterations = manager.list()
        
        progress_counter = Value('i', 0)
        lock = Lock()

        with tqdm(total=len(application_data), desc="Processing 1st Step") as pbar:
            progress_thread = Thread(target=update_progress_bar, args=(len(application_data), pbar))
            progress_thread.start()

            with Pool(num_processes, initializer=init, initargs=(lock, progress_counter)) as pool:
                pool.starmap(process_chunk, [(application_chunk, data_to_add_list, processed_routers, inactive_nodes_lists_processed_iterations) for application_chunk in application_chunks])

            progress_thread.join()
...

进程共享

processed_routers
列表,其读写方式如下:

def process_chunk(chunk, data_to_add_list, processed_routers, inactive_nodes_lists_processed_iterations):

    for application_data_record in chunk.itertuples():
        application_iteration = application_data_record.iteration # integer value
        router_id = int(application_data_record.node // 4)
        ...

        add_router_data_ports = False
        with lock:
            if (application_iteration, router_id) not in list(processed_routers):
                # keep track of the fact that port 0-10 data has been processed for this iteration for this router
                processed_routers.append((application_iteration, router_id))
                add_router_data_ports = True

        if add_router_data_ports:
            # do stuff
...

基本上,列表保存整数元组 (x,y):预期行为是 避免两个进程处理与同一元组相关的数据

python multiprocessing tmux broken-pipe
1个回答
0
投票

这可能不是原因,但对 400 员工进行流程民意调查似乎很不方便且不切实际。

实际上,在这个模型中,400 个工作人员中的每个都会收到 TMUX 伪 tty 文件作为他们的 stdin/stdout/stderr - 当您重新附加一个文件时,重新连接这 1200 个文件时,很可能会出现 tmux 内部的一些边缘情况。会产生此错误的会话。

所以,我会在那里尝试的事情:

  1. 以一种子进程拥有私有的、由根进程拥有的 stdin/stdout/stderr 文件的方式创建子进程,而不是共享根 PTY。 - 如果你能做到这一点,通过正确遵循 Python 多处理和子进程文档上的规范,问题应该以确定性的方式消失。

  2. 在池中使用更合理数量的工作线程 - 请记住,在子进程池中,工作线程被重复使用,并且拥有比可用 CPU 核心的物理数量更多的工作线程没有多大意义。如果您的进程仍然留下大量未使用的 CPU 资源(例如,由于等待 I/O),则必须更改架构中的某些内容以对其进行优化,而不需要 400 个工作线程来最大化您的物理核心 - 可能使用 ASYNCIO,或者在主机或工人。减少工人数量很可能会纠正你的错误,但不是以一种确定性的方式。无论如何,我建议将其与第 (1) 项中建议的修复一起进行。

© www.soinside.com 2019 - 2024. All rights reserved.