这是这个问题的分支。 python 中的代码运行良好。当我尝试 cythonized 版本时,尽管我在顶层定义了 init_worker_processes,但我开始收到“Can't pickle
error: unrecognized arguments: -s -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=8, pipe_handle=16) --multiprocessing-fork
Python3/lib/python3.9/multiprocessing/resource_tracker.py:96: UserWarning: resource_tracker: process died unexpectedly, relaunching. Some resources might leak.
warnings.warn('resource_tracker: process died unexpectedly, '
我没有明确使用错误中报告的
-s
或 -c
。该错误来自多处理库中的以下代码(方法 - ensure_running
)
warnings.warn('resource_tracker: process died unexpectedly, '
'relaunching. Some resources might leak.')
如何解决这个问题?
# updated Python code
# ---------------------- mp_app.py ------------------
import argparse
import logging
import signal
import sys
import time
import multiprocessing as mp
from dataclasses import dataclass
from typing import Dict, NoReturn
import numpy as np
from mp_utils import init_worker_processes
@dataclass
class TmpData:
name: str
value: int
def worker(name: str, data: TmpData) -> NoReturn:
logger_obj = mp.get_logger()
logger_obj.info(f"processing : {name}; value: {data.value}")
time.sleep(data.value)
def get_args(logger: logging.Logger) -> argparse.Namespace:
parser = argparse.ArgumentParser(description="test MP app")
parser.add_argument(
"-m",
"--max-time",
type=int,
dest="max_time",
required=True,
help="max timeout in seconds",
)
parser.add_argument(
"-j",
dest="num_workers",
type=int,
default=1,
required=False,
help=argparse.SUPPRESS,
)
try:
args = parser.parse_args()
except argparse.ArgumentError as err:
logger.exception(parser.print_help())
raise err
return args
def mp_app(options: argparse.Namespace, logger: logging.Logger) -> NoReturn:
map_data: Dict[str, TmpData] = {
key: TmpData(name=key, value=np.random.randint(1, options.max_time))
for key in ["ABC", "DEF", "GHI", "JKL", "PQR", "STU", "XYZ"]
}
with mp.get_context("fork").Pool(
processes=options.num_workers,
initializer=init_worker_processes,
) as pool:
results = []
for key in map_data:
try:
results.append(
pool.apply_async(
worker,
args=(
key,
map_data[key],
),
)
)
except KeyboardInterrupt:
pool.terminate()
pool.close()
pool.join()
for result in results:
try:
result.get()
except Exception as err:
logger.error(f"{err}")
if __name__ == "__main__":
main_logger = logging.getLogger()
try:
args = get_args(main_logger)
mp_app(options=args, logger=main_logger)
except Exception as e:
main_logger.error(e)
raise SystemExit(1) from e
sys.exit(0)
# --------------------- mp_utils.py --------------------------
import multiprocessing
import logging
import signal
from typing import NoReturn
def init_worker_processes() -> NoReturn:
"""
Initializes each worker to handle signals
Returns:
None
"""
this_process_logger = multiprocessing.log_to_stderr()
this_process_logger.setLevel(logging.INFO)
signal.signal(signal.SIGINT, signal.SIG_IGN)
请注意,主要问题似乎是“-s”和“-c”选项无法识别;不确定那些来自哪里。
虽然我仍在尝试破译 cython 构建过程,因为它是通过我们环境中的复杂 make 系统发生的。但是,我猜测我能够追踪 -s 和 -c 选项的根本原因。
-s
似乎来自subprocess.py(模块subprocess)中的_args_from_interpreter_flags
方法。
在我的 python shell 中,我看到 sys.flags 如下 -
>>> sys.flags sys.flags(debug=0, inspect=0, interactive=0, optimize=0, dont_write_bytecode=0, no_user_site=1, no_site=0, ignore_environment=0, verbose=0, bytes_warning=0, quiet=0, hash_randomization=1, isolated=0, dev_mode=False, utf8_mode=0, int_max_str_digits=-1)
由于 sys.flags.no_user_site 为 1,因此
-s
似乎已被附加。
spawn.py 中的 get_command_line
似乎添加了 -c
。由于该分支来自 if getattr(sys, 'frozen', False)
的其他分支,spwan 方法是否不应该与 cythonized 二进制文件一起使用?
我尝试了“fork”和“spawn”。两者都可以在 Python 中运行。但是,通过 cythonized 构建,基于“spawn”的应用程序,我收到“UserWarning:resource_tracker:进程意外死亡,正在重新启动。某些资源可能泄漏”消息以及 -s 和 -c 的“无法识别的参数”。基于“fork”的应用程序的 cythonized 版本只是在启动时挂起,就好像它在等待某个锁一样。我尝试过 pstack 进程 ID,但找不到任何东西 -
# top 20 frames from pstack
#0 0x00007ffff799675d in read () from /usr/lib64/libpthread.so.0
#1 0x00007ffff70c3996 in _Py_read (fd=fd@entry=3, buf=0x7fffbabfdbf0, count=count@entry=4) at Python/fileutils.c:1707
#2 0x00007ffff70ce872 in os_read_impl (module=<optimized out>, length=4, fd=3) at ./Modules/posixmodule.c:9474
#3 os_read (module=<optimized out>, nargs=<optimized out>, args=<optimized out>) at ./Modules/clinic/posixmodule.c.h:5012
#4 os_read (module=<optimized out>, args=<optimized out>, nargs=<optimized out>) at ./Modules/clinic/posixmodule.c.h:4977
#5 0x00007ffff6fc444f in cfunction_vectorcall_FASTCALL (func=0x7ffff7f4aa90, args=0x7fffbae68f10, nargsf=<optimized out>, kwnames=<optimized out>) at Objects/methodobject.c:430
#6 0x00007ffff6f303ec in _PyObject_VectorcallTstate (kwnames=0x0, nargsf=<optimized out>, args=0x7fffbae68f10, callable=0x7ffff7f4aa90, tstate=0x419cb0) at ./Include/cpython/abstract.h:118
#7 PyObject_Vectorcall (kwnames=0x0, nargsf=<optimized out>, args=0x7fffbae68f10, callable=<optimized out>) at ./Include/cpython/abstract.h:127
#8 call_function (kwnames=0x0, oparg=<optimized out>, pp_stack=<synthetic pointer>, tstate=0x419cb0) at Python/ceval.c:5077
#9 _PyEval_EvalFrameDefault (tstate=<optimized out>, f=<optimized out>, throwflag=<optimized out>) at Python/ceval.c:3520
#10 0x00007ffff7066ea4 in _PyEval_EvalFrame (throwflag=0, f=0x7fffbae68d60, tstate=0x419cb0) at ./Include/internal/pycore_ceval.h:40
#11 _PyEval_EvalCode (tstate=tstate@entry=0x419cb0, _co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, args=<optimized out>, argcount=2, kwnames=0x0, kwargs=0x7fffbac0b750, kwcount=0, kwstep=1, defs=0x7fffbae95298, defcount=1, kwdefs=0x0, closure=0x0, name=0x7fffbae8d230, qualname=0x7fffbae8c210) at Python/ceval.c:4329
#12 0x00007ffff6f7baba in _PyFunction_Vectorcall (func=<optimized out>, stack=<optimized out>, nargsf=<optimized out>, kwnames=<optimized out>) at Objects/call.c:396
#13 0x00007ffff6f311aa in _PyObject_VectorcallTstate (kwnames=0x0, nargsf=<optimized out>, args=0x7fffbac0b740, callable=0x7fffbabefe50, tstate=0x419cb0) at ./Include/cpython/abstract.h:118
#14 PyObject_Vectorcall (kwnames=0x0, nargsf=<optimized out>, args=0x7fffbac0b740, callable=<optimized out>) at ./Include/cpython/abstract.h:127
#15 call_function (kwnames=0x0, oparg=<optimized out>, pp_stack=<synthetic pointer>, tstate=0x419cb0) at Python/ceval.c:5077
#16 _PyEval_EvalFrameDefault (tstate=<optimized out>, f=<optimized out>, throwflag=<optimized out>) at Python/ceval.c:3506
#17 0x00007ffff7066ea4 in _PyEval_EvalFrame (throwflag=0, f=0x7fffbac0b5b0, tstate=0x419cb0) at ./Include/internal/pycore_ceval.h:40
#18 _PyEval_EvalCode (tstate=tstate@entry=0x419cb0, _co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, args=<optimized out>, argcount=2, kwnames=0x0, kwargs=0x7fffbac08f58, kwcount=0, kwstep=1, defs=0x7fffbae83b08, defcount=1, kwdefs=0x0, closure=0x0, name=0x7fffbae84830, qualname=0x7fffbae8c3f0) at Python/ceval.c:4329
#19 0x00007ffff6f7baba in _PyFunction_Vectorcall (func=<optimized out>, stack=<optimized out>, nargsf=<optimized out>, kwnames=<optimized out>) at Objects/call.c:396
#20 0x00007ffff6f311aa in _PyObject_VectorcallTstate (kwnames=0x0, nargsf=<optimized out>, args=0x7fffbac08f48, callable=0x7fffbabeff70, tstate=0x419cb0) at ./Include/cpython/abstract.h:118
我检查了 cython 构建过程打印如下内容:
cython --3str --embed --no-docstrings -o mp_app.c mp_app.py
gcc -Os -Loss/Python3/lib -DNDEBUG -Wl,--strip-all -IPython-3.9.15/include/python3.9 -LPython-3.9.15/lib/python3.9/config-3.9-x86_64-linux-gnu -LPython-3.9.15/lib -lcrypt -lpthread -ldl -lutil -lm -lm -B /binutils/bin -static-libgcc -static-libstdc++ -fPIC -lpython3.9 mp_app.c -o mp_app.pex
PS:我还编辑了源代码示例
我能够通过从
fork
启动方法切换到 spawn
启动方法来解决该问题。此外,我必须将 worker
方法从 mp_app.py
移动到 mp_utils.py
,否则在 cythonized 版本中,它会为 cythonized 函数 PicklingError
抛出 worker
。
我仍然不确定为什么
spawn
启动方法在CentOS7机器上对我不起作用。
最终代码大致如下:
# ------------- mp_app.py ------------------
import argparse
import logging
import signal
import sys
import multiprocessing as mp
from typing import Dict, NoReturn
import numpy as np
from mp_utils import (init_worker_processes, worker_task, InterProcessData)
def get_args(logger: logging.Logger) -> argparse.Namespace:
parser = argparse.ArgumentParser(description="test MP app")
parser.add_argument(
"-m",
"--max-time",
type=int,
dest="max_time",
required=True,
help="max timeout in seconds",
)
parser.add_argument(
"-j",
dest="num_workers",
type=int,
default=1,
required=False,
help=argparse.SUPPRESS,
)
try:
args = parser.parse_args()
except argparse.ArgumentError as err:
logger.exception(parser.print_help())
raise err
return args
def mp_app(options: argparse.Namespace, logger: logging.Logger) -> NoReturn:
map_data: Dict[str, InterProcessData] = {
key: InterProcessData(name=key, value=np.random.randint(1, options.max_time))
for key in ["ABC", "DEF", "GHI", "JKL", "PQR", "STU", "XYZ"]
}
with mp.get_context("fork").Pool(
processes=options.num_workers,
initializer=init_worker_processes,
) as pool:
results = []
for key in map_data:
try:
results.append(
pool.apply_async(
worker_task,
args=(
key,
map_data[key],
),
)
)
except KeyboardInterrupt:
pool.terminate()
pool.close()
pool.join()
for result in results:
try:
result.get()
except Exception as err:
logger.error(f"{err}")
if __name__ == "__main__":
main_logger = logging.getLogger()
try:
args = get_args(main_logger)
mp_app(options=args, logger=main_logger)
except Exception as e:
main_logger.error(e)
raise SystemExit(1) from e
sys.exit(0)
# ---------- mp_utils.py -----------
import time
import logging
import signal
import multiprocessing
from dataclasses import dataclass
from typing import NoReturn
@dataclass
class InterProcessData:
name: str
value: int
def worker_task(name: str, data: InterProcessData) -> NoReturn:
logger_obj = multiprocessing.get_logger()
logger_obj.info(f"processing : {name}; value: {data.value}")
time.sleep(data.value)
def init_worker_processes() -> NoReturn:
this_process_logger = multiprocessing.log_to_stderr()
this_process_logger.setLevel(logging.INFO)
signal.signal(signal.SIGINT, signal.SIG_IGN)