多处理是在单个计算机系统中使用两个或更多个中央处理单元(CPU)
我想同时执行以下两个函数。 来自多处理导入过程 导入操作系统 导入日期时间 def func_1(标题): now = datetime.datetime.now() 打印“
这是这个问题的一个分支。 python 中的代码运行良好。当我尝试 cythonized 版本时,我开始收到“Can't pickle 这是这个问题的分支。 python 中的代码运行良好。当我尝试 cythonized 版本时,尽管我在顶层定义了 init_worker_processes,但我开始收到“Can't pickle ”。因此,我将其移至另一个模块并使用导入的 init_worker_processes。现在,我收到以下错误: error: unrecognized arguments: -s -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=8, pipe_handle=16) --multiprocessing-fork Python3/lib/python3.9/multiprocessing/resource_tracker.py:96: UserWarning: resource_tracker: process died unexpectedly, relaunching. Some resources might leak. warnings.warn('resource_tracker: process died unexpectedly, ' 我没有明确使用错误中报告的 -s 或 -c。该错误来自多处理库中的以下代码(方法 - ensure_running) warnings.warn('resource_tracker: process died unexpectedly, ' 'relaunching. Some resources might leak.') 如何解决这个问题? # updated Python code # ---------------------- mp_app.py ------------------ import argparse import logging import signal import sys import time import multiprocessing as mp from dataclasses import dataclass from typing import Dict, NoReturn import numpy as np from mp_utils import init_worker_processes @dataclass class TmpData: name: str value: int def worker(name: str, data: TmpData) -> NoReturn: logger_obj = mp.get_logger() logger_obj.info(f"processing : {name}; value: {data.value}") time.sleep(data.value) def get_args(logger: logging.Logger) -> argparse.Namespace: parser = argparse.ArgumentParser(description="test MP app") parser.add_argument( "-m", "--max-time", type=int, dest="max_time", required=True, help="max timeout in seconds", ) parser.add_argument( "-j", dest="num_workers", type=int, default=1, required=False, help=argparse.SUPPRESS, ) try: args = parser.parse_args() except argparse.ArgumentError as err: logger.exception(parser.print_help()) raise err return args def mp_app(options: argparse.Namespace, logger: logging.Logger) -> NoReturn: map_data: Dict[str, TmpData] = { key: TmpData(name=key, value=np.random.randint(1, options.max_time)) for key in ["ABC", "DEF", "GHI", "JKL", "PQR", "STU", "XYZ"] } with mp.get_context("fork").Pool( processes=options.num_workers, initializer=init_worker_processes, ) as pool: results = [] for key in map_data: try: results.append( pool.apply_async( worker, args=( key, map_data[key], ), ) ) except KeyboardInterrupt: pool.terminate() pool.close() pool.join() for result in results: try: result.get() except Exception as err: logger.error(f"{err}") if __name__ == "__main__": main_logger = logging.getLogger() try: args = get_args(main_logger) mp_app(options=args, logger=main_logger) except Exception as e: main_logger.error(e) raise SystemExit(1) from e sys.exit(0) # --------------------- mp_utils.py -------------------------- import multiprocessing import logging import signal from typing import NoReturn def init_worker_processes() -> NoReturn: """ Initializes each worker to handle signals Returns: None """ this_process_logger = multiprocessing.log_to_stderr() this_process_logger.setLevel(logging.INFO) signal.signal(signal.SIGINT, signal.SIG_IGN) 请注意,主要问题似乎是“-s”和“-c”选项无法识别;不确定那些来自哪里。 编辑2: 虽然我仍在尝试破译 cython 构建过程,因为它是通过我们环境中的复杂 make 系统发生的。但是,我猜测我能够追踪 -s 和 -c 选项的根本原因。 -s似乎来自subprocess.py(模块subprocess)中的_args_from_interpreter_flags方法。 在我的 python shell 中,我看到 sys.flags 如下 - >>> sys.flags sys.flags(debug=0, inspect=0, interactive=0, optimize=0, dont_write_bytecode=0, no_user_site=1, no_site=0, ignore_environment=0, verbose=0, bytes_warning=0, quiet=0, hash_randomization=1, isolated=0, dev_mode=False, utf8_mode=0, int_max_str_digits=-1) 由于 sys.flags.no_user_site 为 1,因此 -s 似乎已被附加。 spawn.py 中的 get_command_line 似乎添加了 -c。由于该分支来自 if getattr(sys, 'frozen', False) 的其他分支,spwan 方法是否不应该与 cythonized 二进制文件一起使用? 编辑-3: 我尝试了“fork”和“spawn”。两者都可以在 Python 中运行。但是,通过 cythonized 构建,基于“spawn”的应用程序,我收到“UserWarning:resource_tracker:进程意外死亡,正在重新启动。某些资源可能泄漏”消息以及 -s 和 -c 的“无法识别的参数”。基于“fork”的应用程序的 cythonized 版本只是在启动时挂起,就好像它在等待某个锁一样。我尝试过 pstack 进程 ID,但找不到任何东西 - # top 20 frames from pstack #0 0x00007ffff799675d in read () from /usr/lib64/libpthread.so.0 #1 0x00007ffff70c3996 in _Py_read (fd=fd@entry=3, buf=0x7fffbabfdbf0, count=count@entry=4) at Python/fileutils.c:1707 #2 0x00007ffff70ce872 in os_read_impl (module=<optimized out>, length=4, fd=3) at ./Modules/posixmodule.c:9474 #3 os_read (module=<optimized out>, nargs=<optimized out>, args=<optimized out>) at ./Modules/clinic/posixmodule.c.h:5012 #4 os_read (module=<optimized out>, args=<optimized out>, nargs=<optimized out>) at ./Modules/clinic/posixmodule.c.h:4977 #5 0x00007ffff6fc444f in cfunction_vectorcall_FASTCALL (func=0x7ffff7f4aa90, args=0x7fffbae68f10, nargsf=<optimized out>, kwnames=<optimized out>) at Objects/methodobject.c:430 #6 0x00007ffff6f303ec in _PyObject_VectorcallTstate (kwnames=0x0, nargsf=<optimized out>, args=0x7fffbae68f10, callable=0x7ffff7f4aa90, tstate=0x419cb0) at ./Include/cpython/abstract.h:118 #7 PyObject_Vectorcall (kwnames=0x0, nargsf=<optimized out>, args=0x7fffbae68f10, callable=<optimized out>) at ./Include/cpython/abstract.h:127 #8 call_function (kwnames=0x0, oparg=<optimized out>, pp_stack=<synthetic pointer>, tstate=0x419cb0) at Python/ceval.c:5077 #9 _PyEval_EvalFrameDefault (tstate=<optimized out>, f=<optimized out>, throwflag=<optimized out>) at Python/ceval.c:3520 #10 0x00007ffff7066ea4 in _PyEval_EvalFrame (throwflag=0, f=0x7fffbae68d60, tstate=0x419cb0) at ./Include/internal/pycore_ceval.h:40 #11 _PyEval_EvalCode (tstate=tstate@entry=0x419cb0, _co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, args=<optimized out>, argcount=2, kwnames=0x0, kwargs=0x7fffbac0b750, kwcount=0, kwstep=1, defs=0x7fffbae95298, defcount=1, kwdefs=0x0, closure=0x0, name=0x7fffbae8d230, qualname=0x7fffbae8c210) at Python/ceval.c:4329 #12 0x00007ffff6f7baba in _PyFunction_Vectorcall (func=<optimized out>, stack=<optimized out>, nargsf=<optimized out>, kwnames=<optimized out>) at Objects/call.c:396 #13 0x00007ffff6f311aa in _PyObject_VectorcallTstate (kwnames=0x0, nargsf=<optimized out>, args=0x7fffbac0b740, callable=0x7fffbabefe50, tstate=0x419cb0) at ./Include/cpython/abstract.h:118 #14 PyObject_Vectorcall (kwnames=0x0, nargsf=<optimized out>, args=0x7fffbac0b740, callable=<optimized out>) at ./Include/cpython/abstract.h:127 #15 call_function (kwnames=0x0, oparg=<optimized out>, pp_stack=<synthetic pointer>, tstate=0x419cb0) at Python/ceval.c:5077 #16 _PyEval_EvalFrameDefault (tstate=<optimized out>, f=<optimized out>, throwflag=<optimized out>) at Python/ceval.c:3506 #17 0x00007ffff7066ea4 in _PyEval_EvalFrame (throwflag=0, f=0x7fffbac0b5b0, tstate=0x419cb0) at ./Include/internal/pycore_ceval.h:40 #18 _PyEval_EvalCode (tstate=tstate@entry=0x419cb0, _co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, args=<optimized out>, argcount=2, kwnames=0x0, kwargs=0x7fffbac08f58, kwcount=0, kwstep=1, defs=0x7fffbae83b08, defcount=1, kwdefs=0x0, closure=0x0, name=0x7fffbae84830, qualname=0x7fffbae8c3f0) at Python/ceval.c:4329 #19 0x00007ffff6f7baba in _PyFunction_Vectorcall (func=<optimized out>, stack=<optimized out>, nargsf=<optimized out>, kwnames=<optimized out>) at Objects/call.c:396 #20 0x00007ffff6f311aa in _PyObject_VectorcallTstate (kwnames=0x0, nargsf=<optimized out>, args=0x7fffbac08f48, callable=0x7fffbabeff70, tstate=0x419cb0) at ./Include/cpython/abstract.h:118 我检查了 cython 构建过程打印如下内容: cython --3str --embed --no-docstrings -o mp_app.c mp_app.py gcc -Os -Loss/Python3/lib -DNDEBUG -Wl,--strip-all -IPython-3.9.15/include/python3.9 -LPython-3.9.15/lib/python3.9/config-3.9-x86_64-linux-gnu -LPython-3.9.15/lib -lcrypt -lpthread -ldl -lutil -lm -lm -B /binutils/bin -static-libgcc -static-libstdc++ -fPIC -lpython3.9 mp_app.c -o mp_app.pex PS:我还编辑了源代码示例 我能够通过从 fork 启动方法切换到 spawn 启动方法来解决该问题。此外,我必须将 worker 方法从 mp_app.py 移动到 mp_utils.py,否则在 cythonized 版本中,它会为 cythonized 函数 PicklingError 抛出 worker。 我仍然不确定为什么spawn启动方法在CentOS7机器上对我不起作用。 最终代码大致如下: # ------------- mp_app.py ------------------ import argparse import logging import signal import sys import multiprocessing as mp from typing import Dict, NoReturn import numpy as np from mp_utils import (init_worker_processes, worker_task, InterProcessData) def get_args(logger: logging.Logger) -> argparse.Namespace: parser = argparse.ArgumentParser(description="test MP app") parser.add_argument( "-m", "--max-time", type=int, dest="max_time", required=True, help="max timeout in seconds", ) parser.add_argument( "-j", dest="num_workers", type=int, default=1, required=False, help=argparse.SUPPRESS, ) try: args = parser.parse_args() except argparse.ArgumentError as err: logger.exception(parser.print_help()) raise err return args def mp_app(options: argparse.Namespace, logger: logging.Logger) -> NoReturn: map_data: Dict[str, InterProcessData] = { key: InterProcessData(name=key, value=np.random.randint(1, options.max_time)) for key in ["ABC", "DEF", "GHI", "JKL", "PQR", "STU", "XYZ"] } with mp.get_context("fork").Pool( processes=options.num_workers, initializer=init_worker_processes, ) as pool: results = [] for key in map_data: try: results.append( pool.apply_async( worker_task, args=( key, map_data[key], ), ) ) except KeyboardInterrupt: pool.terminate() pool.close() pool.join() for result in results: try: result.get() except Exception as err: logger.error(f"{err}") if __name__ == "__main__": main_logger = logging.getLogger() try: args = get_args(main_logger) mp_app(options=args, logger=main_logger) except Exception as e: main_logger.error(e) raise SystemExit(1) from e sys.exit(0) # ---------- mp_utils.py ----------- import time import logging import signal import multiprocessing from dataclasses import dataclass from typing import NoReturn @dataclass class InterProcessData: name: str value: int def worker_task(name: str, data: InterProcessData) -> NoReturn: logger_obj = multiprocessing.get_logger() logger_obj.info(f"processing : {name}; value: {data.value}") time.sleep(data.value) def init_worker_processes() -> NoReturn: this_process_logger = multiprocessing.log_to_stderr() this_process_logger.setLevel(logging.INFO) signal.signal(signal.SIGINT, signal.SIG_IGN)
我在快速的本地 SSD 上存储了大量图像文件(约 220,000 个)。使用 python 和 tifffile 库,我将图像读取为 numpy 数组,然后将其组合成一个数组......
我在快速的本地 SSD 上存储了大量图像文件(约 220,000 个)。使用 python 和 tifffile 库,我将图像读取为 numpy 数组,然后将其组合成一个数组......
我在Python中有一个父类,可以在子类中启动一个进程。子类有一个 multiprocessing.Process 可以更改一些变量。我希望人们能够看到变化...
我有一个问题要问对Python多处理库有更多经验的人,我现在几乎迷失了。 我目前正在构建一个应该在两者上运行的图像处理应用程序
我正在尝试在并行处理Python脚本中写入同一个共享数组。 当我在课堂外以正常脚本进行操作时,一切正常。但是当我尝试通过课堂来做到这一点时...
在没有队列的多处理中使用Process时如何返回函数的结果?
我有一些使用 Pytest 的测试,我想确认一下: 测试按顺序运行(没有并行处理,顺序无关紧要) 每个测试都在一个新进程中运行。 我正在尝试使用多进程...
Python 进程(不包括主进程)在 docker 内无法访问互联网。 我有一个 BotManager 类,它为每个 AsyncTelebot 启动一个新进程。它有方法: 异步定义
我目前正在开发一个多处理Python程序,其中每个进程将其索引作为连续的4字节整数写入共享内存。并且有一个阅读器读取其他进程的...
我目前正在开发一个多处理Python程序,其中每个进程将其索引作为连续的4字节整数写入共享内存。并且有一个阅读器读取其他进程的...
我有一个Python程序,我想同时运行多个具有不同参数的实例。这些实例彼此之间不进行通信。在单独的终端中运行每个实例...
time.perf_counter() 或 time.perf_counter_ns() 给出奇怪且不正确的值
我在使用多处理时写了这个简单的东西。 来自多处理导入进程,cpu_count 导入时间 def 计数器(数字): 计数 = 0 数数的同时 < num: c...
我有一个大型数据框,我需要对其进行大量匹配操作,并且过去一直使用以下方法来执行此操作。然而,我目前正在尝试的数据框
此代码可以运行 我得到一个空数组“链接” 虽然我应该得到一个包含对象的数组, 当我在 func_b 中打印链接时,它不为空 Self 应该指的是对象,但我猜
Jupyter 笔记本永远不会使用多处理完成处理(Python 3)
Jupyter 笔记本 我基本上使用多处理模块,我仍在学习多处理的功能。我正在使用达斯蒂·菲利普斯的书,这个代码属于它。 导入
在Python多处理中执行二进制信号量或互斥体以进行上下文切换操作
我正在尝试自动化win应用程序和java应用程序之间的同步关系。 我的标准是: 启动 win 和 jav 应用程序 在jav应用程序中执行命令 等待jav的回复
上下文 系统:Fedora 40 使用的库:pthread.h 详细信息:如果不感兴趣,请阅读“重点” 我很抱歉这里没有提供代码,因为整个程序有超过 4000 行代码......
我正在尝试编写一个函数来并行读取大量文件。我的代码如下: 将 numpy 导入为 np 从多处理导入池 从 functools 导入部分 定义
我有一个python程序,它分为三个文件,dielectric_functions.py,routines.py和cartmoments.py,组织如下-- dielectric_functions.py 是程序的入口点...