在类中使用 Python 多重处理时修复“TypeError:无法 pickle 'weakref' 对象”

问题描述 投票:0回答:1

我目前正在开发一个 Python FastAPI 应用程序,该应用程序使用多处理库在并行线程中生成繁忙的循环。当我第一次开始构建应用程序时,我在启动期间在 main.py 文件中创建了线程。这非常有效,我可以毫无问题地生成我需要的所有线程。

最终我需要特定的 FastAPI 端点也能够控制这些线程,因此我创建了一个可以控制线程并可由启动进程和 FastAPI 端点访问的单例类文件,而不是在应用程序启动时产生威胁。

问题是,一旦我将 Process 方法从 main.py 文件移至 .core.thread_utils.py 文件,当我生成多个线程时,我就会收到以下错误:

  File "/home/user/.local/lib/python3.8/site-packages/starlette/routing.py", line 677, in lifespan
    async with self.lifespan_context(app) as maybe_state:
  File "/home/user/.local/lib/python3.8/site-packages/starlette/routing.py", line 566, in __aenter__
    await self._router.startup()
  File "/home/user/.local/lib/python3.8/site-packages/starlette/routing.py", line 654, in startup
    await handler()
  File "/home/user/Projects/test/test-api/api/main.py", line 127, in startup_event
    thread_utils.launch_all_threads()
  File "/home/user/Projects/test/test-api/api/core/thread_utils.py", line 66, in launch_all_threads
    p.start()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File "/usr/lib/python3.8/multiprocessing/context.py", line 224, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "/usr/lib/python3.8/multiprocessing/context.py", line 284, in _Popen
    return Popen(process_obj)
  File "/usr/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/usr/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 47, in _launch
    reduction.dump(process_obj, fp)
  File "/usr/lib/python3.8/multiprocessing/reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle 'weakref' object

因此,在 main.py 中运行良好的代码一旦放入单独的类中,现在就会抛出错误。另外,我可以毫无问题地生成 one 线程,但是 more 线程会抛出“无法 pickle”类型错误。我不明白为什么。

在对 Python 如何分叉新线程进行一些研究后,我相信我遇到了 这个错误。另外,多处理文档提到“安全导入”,但我不确定这是否/如何适用于这种情况,因为尝试应用它会产生相同的错误或根本没有线程。我可能做错了......

我使用的代码看起来像这样:

main.py:

from .core.thread_utils import thread_utils
thread_utils = thread_utils()
...
@app.on_event("startup") # this is part of FastAPI
async def startup_event():
    # previous thread creation went here and worked perfectly
    thread_utils.launch_all_threads()

核心/thread_utils.py:

from multiprocessing import Process
from ..core import busy_loop

class thread_utils:
    _self = None

    def __new__(cls):
        if cls._self is None:
            cls._self = super().__new__(cls)
            return cls._self

    def launch_runner_thread(self, t:int):
        busy_loop(t)

    def launch_all_threads(self):
        t_list = [1,2]
        for t in t_list:
            p = Process(target=self.launch_runner_thread, args=(t,))
            p.start()

任何关于如何在单独的类中实现多处理的帮助、指导或工作示例将非常感激!

工作代码(感谢下面@pts的建议!)

main.py

from .core.thread_utils import launch_all_threads
...
@app.on_event("startup")
async def startup_event():
    launch_all_threads()

核心/thread_utils.py:

from multiprocessing import Process
from ..core import busy_loop

def launch_runner_thread(t:int):
    busy_loop(t)

def launch_all_threads():
    t_list = [1,2]
    for t in t_list:
        p = Process(target=launch_runner_thread, args=(t,))
        p.start()

我将继续更新此问题的标题以匹配问题/解决方案,并将回答归功于@pts。再次感谢您!

python class multiprocessing fastapi
1个回答
1
投票

self.
调用中删除
Process(target=self....)
,并将被调用的函数移到类之外。这应该可以解决类型错误。

© www.soinside.com 2019 - 2024. All rights reserved.