我有一个长时间运行的 python 脚本,需要在大学实验室的服务器上执行。我首先启动一个 tmux 会话,而不是直接在 SSH 客户端中执行并冒着连接中断从而导致进程被终止的风险:
tmux new -s preprocessing
并在会话中运行我的脚本:
python3 path/to/my/scirpt.py
之后,我退出会话并关闭 ssh 连接。 一段时间后,我想检查进度,因此我通过 SSH 重新连接到服务器并附加到 tmux 会话:
tmux attach -t preprocessing
我首先在终端中看到脚本的进度 - 它有一个像这样的进度条:
Processing 1st Step: 7%|███▌ | 61725/921600 [11:11:49<155:59:00, 1.53it/s]
但是几秒钟后我收到此错误:
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/lib/python3.10/multiprocessing/pool.py", line 125, in worker
result = (True, func(*args, **kwds))
File "/usr/lib/python3.10/multiprocessing/pool.py", line 51, in starmapstar
return list(itertools.starmap(args[0], args[1]))
File "/home/plodir2/network-predictive-analysis/scripts/data_preprocessing_MILC+LAMMPS+UR_lammps_1056.py", line 419, in process_chunk
if (application_iteration, router_id) not in list(processed_routers):
File "<string>", line 2, in __len__
File "/usr/lib/python3.10/multiprocessing/managers.py", line 817, in _callmethod
conn.send((self._id, methodname, args, kwds))
File "/usr/lib/python3.10/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/lib/python3.10/multiprocessing/connection.py", line 411, in _send_bytes
self._send(header + buf)
File "/usr/lib/python3.10/multiprocessing/connection.py", line 368, in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/plodir2/network-predictive-analysis/scripts/data_preprocessing_MILC+LAMMPS+UR_lammps_1056.py", line 600, in <module>
pool.starmap(process_chunk, [(application_chunk, data_to_add_list, processed_routers, inactive_nodes_lists_processed_iterations) for application_chunk in application_chunks])
File "/usr/lib/python3.10/multiprocessing/pool.py", line 375, in starmap
return self._map_async(func, iterable, starmapstar, chunksize).get()
File "/usr/lib/python3.10/multiprocessing/pool.py", line 774, in get
raise self._value
BrokenPipeError: [Errno 32] Broken pipe
我认为这与 tmux 会话有关,因为我在附加到它后就得到了它。此外,进度条上显示的 11 小时与我启动脚本和检查其脚本时所经过的时间精确匹配进步。此外,我已经使用较小的数据集运行了脚本,并且它处理它没有错误(在 SSH 中运行它,同时保持连接活动)。
我可以提供有关该脚本的更多详细信息,但它本质上是为了处理 pandas 数据帧,
application_data
将其分成块,每个块都由不同的进程处理。这样做是为了在大量数据的情况下提高性能。
...
if __name__ == "__main__":
num_processes = 400
application_chunks = np.array_split(application_data, num_processes)
with Manager() as manager:
data_to_add_list = manager.list()
processed_routers = manager.list()
inactive_nodes_lists_processed_iterations = manager.list()
progress_counter = Value('i', 0)
lock = Lock()
with tqdm(total=len(application_data), desc="Processing 1st Step") as pbar:
progress_thread = Thread(target=update_progress_bar, args=(len(application_data), pbar))
progress_thread.start()
with Pool(num_processes, initializer=init, initargs=(lock, progress_counter)) as pool:
pool.starmap(process_chunk, [(application_chunk, data_to_add_list, processed_routers, inactive_nodes_lists_processed_iterations) for application_chunk in application_chunks])
progress_thread.join()
...
进程共享
processed_routers
列表,其读写方式如下:
def process_chunk(chunk, data_to_add_list, processed_routers, inactive_nodes_lists_processed_iterations):
for application_data_record in chunk.itertuples():
application_iteration = application_data_record.iteration # integer value
router_id = int(application_data_record.node // 4)
...
add_router_data_ports = False
with lock:
if (application_iteration, router_id) not in list(processed_routers):
# keep track of the fact that port 0-10 data has been processed for this iteration for this router
processed_routers.append((application_iteration, router_id))
add_router_data_ports = True
if add_router_data_ports:
# do stuff
...
基本上,列表保存整数元组 (x,y):预期行为是 避免两个进程处理与同一元组相关的数据。
这可能不是原因,但对 400 员工进行流程民意调查似乎很不方便且不切实际。
实际上,在这个模型中,400 个工作人员中的每个都会收到 TMUX 伪 tty 文件作为他们的 stdin/stdout/stderr - 当您重新附加一个文件时,重新连接这 1200 个文件时,很可能会出现 tmux 内部的一些边缘情况。会产生此错误的会话。
所以,我会在那里尝试的事情: