“进程意外死亡”,多处理代码的 cythonized 版本

问题描述 投票:0回答:1

这是这个问题的分支。 python 中的代码运行良好。当我尝试 cythonized 版本时,尽管我在顶层定义了 init_worker_processes,但我开始收到“Can't pickle ”。因此,我将其移至另一个模块并使用导入的 init_worker_processes。现在,我收到以下错误:

error: unrecognized arguments: -s -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=8, pipe_handle=16) --multiprocessing-fork
Python3/lib/python3.9/multiprocessing/resource_tracker.py:96: UserWarning: resource_tracker: process died unexpectedly, relaunching.  Some resources might leak.
  warnings.warn('resource_tracker: process died unexpectedly, '

我没有明确使用错误中报告的

-s
-c
。该错误来自多处理库中的以下代码(方法 -
ensure_running

warnings.warn('resource_tracker: process died unexpectedly, '
                              'relaunching.  Some resources might leak.')

如何解决这个问题?

# updated Python code
# ---------------------- mp_app.py ------------------
import argparse
import logging
import signal
import sys
import time

import multiprocessing as mp
from dataclasses import dataclass
from typing import Dict, NoReturn

import numpy as np

from mp_utils import init_worker_processes 


@dataclass
class TmpData:
    name: str
    value: int


def worker(name: str, data: TmpData) -> NoReturn:
    logger_obj = mp.get_logger()
    logger_obj.info(f"processing : {name}; value: {data.value}")

    time.sleep(data.value)


def get_args(logger: logging.Logger) -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="test MP app")
    parser.add_argument(
        "-m",
        "--max-time",
        type=int,
        dest="max_time",
        required=True,
        help="max timeout in seconds",
    )

    parser.add_argument(
        "-j",
        dest="num_workers",
        type=int,
        default=1,
        required=False,
        help=argparse.SUPPRESS,
    )

    try:
        args = parser.parse_args()
    except argparse.ArgumentError as err:
        logger.exception(parser.print_help())
        raise err

    return args


def mp_app(options: argparse.Namespace, logger: logging.Logger) -> NoReturn:
    map_data: Dict[str, TmpData] = {
        key: TmpData(name=key, value=np.random.randint(1, options.max_time))
        for key in ["ABC", "DEF", "GHI", "JKL", "PQR", "STU", "XYZ"]
    }

    with mp.get_context("fork").Pool(
        processes=options.num_workers,
        initializer=init_worker_processes,
    ) as pool:
        results = []
        for key in map_data:
            try:
                results.append(
                    pool.apply_async(
                        worker,
                        args=(
                            key,
                            map_data[key],
                        ),
                    )
                )
            except KeyboardInterrupt:
                pool.terminate()

        pool.close()
        pool.join()

        for result in results:
            try:
                result.get()
            except Exception as err:
                logger.error(f"{err}")


if __name__ == "__main__":
    main_logger = logging.getLogger()

    try:
        args = get_args(main_logger)
        mp_app(options=args, logger=main_logger)
    except Exception as e:
        main_logger.error(e)
        raise SystemExit(1) from e

    sys.exit(0)

# --------------------- mp_utils.py --------------------------

import multiprocessing
import logging
import signal

from typing import NoReturn


def init_worker_processes() -> NoReturn:
    """
    Initializes each worker to handle signals
    Returns:
        None
    """
    this_process_logger = multiprocessing.log_to_stderr()
    this_process_logger.setLevel(logging.INFO)
    signal.signal(signal.SIGINT, signal.SIG_IGN)

请注意,主要问题似乎是“-s”和“-c”选项无法识别;不确定那些来自哪里。

编辑2:

虽然我仍在尝试破译 cython 构建过程,因为它是通过我们环境中的复杂 make 系统发生的。但是,我猜测我能够追踪 -s 和 -c 选项的根本原因。

-s
似乎来自subprocess.py(模块subprocess)中的
_args_from_interpreter_flags
方法。

在我的 python shell 中,我看到 sys.flags 如下 -

>>> sys.flags sys.flags(debug=0, inspect=0, interactive=0, optimize=0, dont_write_bytecode=0, no_user_site=1, no_site=0, ignore_environment=0, verbose=0, bytes_warning=0, quiet=0, hash_randomization=1, isolated=0, dev_mode=False, utf8_mode=0, int_max_str_digits=-1)

由于 sys.flags.no_user_site 为 1,因此

-s
似乎已被附加。

spawn.py 中的

get_command_line
似乎添加了
-c
。由于该分支来自
if getattr(sys, 'frozen', False)
的其他分支,spwan 方法是否不应该与 cythonized 二进制文件一起使用?

编辑-3:

我尝试了“fork”和“spawn”。两者都可以在 Python 中运行。但是,通过 cythonized 构建,基于“spawn”的应用程序,我收到“UserWarning:resource_tracker:进程意外死亡,正在重新启动。某些资源可能泄漏”消息以及 -s 和 -c 的“无法识别的参数”。基于“fork”的应用程序的 cythonized 版本只是在启动时挂起,就好像它在等待某个锁一样。我尝试过 pstack 进程 ID,但找不到任何东西 -

# top 20 frames from pstack
#0  0x00007ffff799675d in read () from /usr/lib64/libpthread.so.0
#1  0x00007ffff70c3996 in _Py_read (fd=fd@entry=3, buf=0x7fffbabfdbf0, count=count@entry=4) at Python/fileutils.c:1707
#2  0x00007ffff70ce872 in os_read_impl (module=<optimized out>, length=4, fd=3) at ./Modules/posixmodule.c:9474
#3  os_read (module=<optimized out>, nargs=<optimized out>, args=<optimized out>) at ./Modules/clinic/posixmodule.c.h:5012
#4  os_read (module=<optimized out>, args=<optimized out>, nargs=<optimized out>) at ./Modules/clinic/posixmodule.c.h:4977
#5  0x00007ffff6fc444f in cfunction_vectorcall_FASTCALL (func=0x7ffff7f4aa90, args=0x7fffbae68f10, nargsf=<optimized out>, kwnames=<optimized out>) at Objects/methodobject.c:430
#6  0x00007ffff6f303ec in _PyObject_VectorcallTstate (kwnames=0x0, nargsf=<optimized out>, args=0x7fffbae68f10, callable=0x7ffff7f4aa90, tstate=0x419cb0) at ./Include/cpython/abstract.h:118
#7  PyObject_Vectorcall (kwnames=0x0, nargsf=<optimized out>, args=0x7fffbae68f10, callable=<optimized out>) at ./Include/cpython/abstract.h:127
#8  call_function (kwnames=0x0, oparg=<optimized out>, pp_stack=<synthetic pointer>, tstate=0x419cb0) at Python/ceval.c:5077
#9  _PyEval_EvalFrameDefault (tstate=<optimized out>, f=<optimized out>, throwflag=<optimized out>) at Python/ceval.c:3520
#10 0x00007ffff7066ea4 in _PyEval_EvalFrame (throwflag=0, f=0x7fffbae68d60, tstate=0x419cb0) at ./Include/internal/pycore_ceval.h:40
#11 _PyEval_EvalCode (tstate=tstate@entry=0x419cb0, _co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, args=<optimized out>, argcount=2, kwnames=0x0, kwargs=0x7fffbac0b750, kwcount=0, kwstep=1, defs=0x7fffbae95298, defcount=1, kwdefs=0x0, closure=0x0, name=0x7fffbae8d230, qualname=0x7fffbae8c210) at Python/ceval.c:4329
#12 0x00007ffff6f7baba in _PyFunction_Vectorcall (func=<optimized out>, stack=<optimized out>, nargsf=<optimized out>, kwnames=<optimized out>) at Objects/call.c:396
#13 0x00007ffff6f311aa in _PyObject_VectorcallTstate (kwnames=0x0, nargsf=<optimized out>, args=0x7fffbac0b740, callable=0x7fffbabefe50, tstate=0x419cb0) at ./Include/cpython/abstract.h:118
#14 PyObject_Vectorcall (kwnames=0x0, nargsf=<optimized out>, args=0x7fffbac0b740, callable=<optimized out>) at ./Include/cpython/abstract.h:127
#15 call_function (kwnames=0x0, oparg=<optimized out>, pp_stack=<synthetic pointer>, tstate=0x419cb0) at Python/ceval.c:5077
#16 _PyEval_EvalFrameDefault (tstate=<optimized out>, f=<optimized out>, throwflag=<optimized out>) at Python/ceval.c:3506
#17 0x00007ffff7066ea4 in _PyEval_EvalFrame (throwflag=0, f=0x7fffbac0b5b0, tstate=0x419cb0) at ./Include/internal/pycore_ceval.h:40
#18 _PyEval_EvalCode (tstate=tstate@entry=0x419cb0, _co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, args=<optimized out>, argcount=2, kwnames=0x0, kwargs=0x7fffbac08f58, kwcount=0, kwstep=1, defs=0x7fffbae83b08, defcount=1, kwdefs=0x0, closure=0x0, name=0x7fffbae84830, qualname=0x7fffbae8c3f0) at Python/ceval.c:4329
#19 0x00007ffff6f7baba in _PyFunction_Vectorcall (func=<optimized out>, stack=<optimized out>, nargsf=<optimized out>, kwnames=<optimized out>) at Objects/call.c:396
#20 0x00007ffff6f311aa in _PyObject_VectorcallTstate (kwnames=0x0, nargsf=<optimized out>, args=0x7fffbac08f48, callable=0x7fffbabeff70, tstate=0x419cb0) at ./Include/cpython/abstract.h:118

我检查了 cython 构建过程打印如下内容:

cython --3str --embed --no-docstrings -o mp_app.c mp_app.py
gcc -Os -Loss/Python3/lib -DNDEBUG -Wl,--strip-all -IPython-3.9.15/include/python3.9 -LPython-3.9.15/lib/python3.9/config-3.9-x86_64-linux-gnu -LPython-3.9.15/lib -lcrypt -lpthread -ldl -lutil -lm -lm -B /binutils/bin  -static-libgcc -static-libstdc++ -fPIC  -lpython3.9 mp_app.c -o mp_app.pex

PS:我还编辑了源代码示例

python python-3.x multiprocessing cython python-multiprocessing
1个回答
0
投票

我能够通过从

fork
启动方法切换到
spawn
启动方法来解决该问题。此外,我必须将
worker
方法从
mp_app.py
移动到
mp_utils.py
,否则在 cythonized 版本中,它会为 cythonized 函数
PicklingError
抛出
worker

我仍然不确定为什么

spawn
启动方法在CentOS7机器上对我不起作用。

最终代码大致如下:

# ------------- mp_app.py ------------------
import argparse
import logging
import signal
import sys


import multiprocessing as mp
from typing import Dict, NoReturn

import numpy as np

from mp_utils import (init_worker_processes, worker_task, InterProcessData)


def get_args(logger: logging.Logger) -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="test MP app")
    parser.add_argument(
        "-m",
        "--max-time",
        type=int,
        dest="max_time",
        required=True,
        help="max timeout in seconds",
    )

    parser.add_argument(
        "-j",
        dest="num_workers",
        type=int,
        default=1,
        required=False,
        help=argparse.SUPPRESS,
    )

    try:
        args = parser.parse_args()
    except argparse.ArgumentError as err:
        logger.exception(parser.print_help())
        raise err

    return args


def mp_app(options: argparse.Namespace, logger: logging.Logger) -> NoReturn:
    map_data: Dict[str, InterProcessData] = {
        key: InterProcessData(name=key, value=np.random.randint(1, options.max_time))
        for key in ["ABC", "DEF", "GHI", "JKL", "PQR", "STU", "XYZ"]
    }

    with mp.get_context("fork").Pool(
        processes=options.num_workers,
        initializer=init_worker_processes,
    ) as pool:
        results = []
        for key in map_data:
            try:
                results.append(
                    pool.apply_async(
                        worker_task,
                        args=(
                            key,
                            map_data[key],
                        ),
                    )
                )
            except KeyboardInterrupt:
                pool.terminate()

        pool.close()
        pool.join()

        for result in results:
            try:
                result.get()
            except Exception as err:
                logger.error(f"{err}")


if __name__ == "__main__":
    main_logger = logging.getLogger()

    try:
        args = get_args(main_logger)
        mp_app(options=args, logger=main_logger)
    except Exception as e:
        main_logger.error(e)
        raise SystemExit(1) from e

    sys.exit(0)


# ---------- mp_utils.py -----------
import time
import logging
import signal
import multiprocessing
from dataclasses import dataclass
from typing import NoReturn


@dataclass
class InterProcessData:
    name: str
    value: int


def worker_task(name: str, data: InterProcessData) -> NoReturn:
    logger_obj = multiprocessing.get_logger()
    logger_obj.info(f"processing : {name}; value: {data.value}")

    time.sleep(data.value)


def init_worker_processes() -> NoReturn:
    this_process_logger = multiprocessing.log_to_stderr()
    this_process_logger.setLevel(logging.INFO)
    signal.signal(signal.SIGINT, signal.SIG_IGN)
© www.soinside.com 2019 - 2024. All rights reserved.