在类中使用Python多重处理的正确方法是什么

问题描述 投票:0回答:1

我目前正在开发一个 Python FastAPI 应用程序,该应用程序使用多处理库在并行线程中生成繁忙的循环。当我第一次开始构建应用程序时,我在启动期间在 main.py 文件中创建了线程。这非常有效,我可以毫无问题地生成我需要的所有线程。

最终我需要特定的 FastAPI 端点也能够控制这些线程,因此我创建了一个可以控制线程并可由启动进程和 FastAPI 端点访问的单例类文件,而不是在应用程序启动时产生威胁。

问题是,一旦我将 Process 方法从 main.py 文件移至 .core.thread_utils.py 文件,当我生成多个线程时,我就会收到以下错误:

  ...
  File "/home/user/Projects/test/test-api/api/core/thread_utils.py", line 66, in launch_all_threads
    p.start()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File "/usr/lib/python3.8/multiprocessing/context.py", line 224, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "/usr/lib/python3.8/multiprocessing/context.py", line 284, in _Popen
    return Popen(process_obj)
  File "/usr/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/usr/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 47, in _launch
    reduction.dump(process_obj, fp)
  File "/usr/lib/python3.8/multiprocessing/reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle 'weakref' object

因此,在 main.py 中运行良好的代码一旦放入单独的类中,现在就会抛出错误。另外,我可以毫无问题地生成 one 线程,但是 more 线程会抛出“无法 pickle”类型错误。我不明白为什么。

在对 Python 如何分叉新线程进行一些研究后,我相信我遇到了 这个错误。另外,多处理文档提到“安全导入”,但我不确定这是否/如何适用于这种情况,因为尝试应用它会产生相同的错误或根本没有线程。我可能做错了......

我使用的代码看起来像这样:

main.py:

from .core.thread_utils import thread_utils
thread_utils = thread_utils()
...
@app.on_event("startup") # this is part of FastAPI
async def startup_event():
    # previous thread creation went here and worked perfectly
    thread_utils.launch_all_threads()

核心/thread_utils.py:

from multiprocessing import Process
from ..core import busy_loop

class thread_utils:
    _self = None

    def __new__(cls):
        if cls._self is None:
            cls._self = super().__new__(cls)
            return cls._self

    def launch_runner_thread(self, t:int):
        busy_loop(t)

    def launch_all_threads(self):
        t_list = [1,2]
        for t in t_list:
            p = Process(target=self.launch_runner_thread, args=(t,))
            p.start()

任何关于如何在单独的类中实现多处理的帮助、指导或工作示例将非常感激!

python class multiprocessing fastapi
1个回答
0
投票

self.
调用中删除
Process(target=self....)
,并将被调用的函数移到类之外。这应该可以解决类型错误。

© www.soinside.com 2019 - 2024. All rights reserved.