我目前正在开发一个 Python FastAPI 应用程序,该应用程序使用多处理库在并行线程中生成繁忙的循环。当我第一次开始构建应用程序时,我在启动期间在 main.py 文件中创建了线程。这非常有效,我可以毫无问题地生成我需要的所有线程。
最终我需要特定的 FastAPI 端点也能够控制这些线程,因此我创建了一个可以控制线程并可由启动进程和 FastAPI 端点访问的单例类文件,而不是在应用程序启动时产生威胁。
问题是,一旦我将 Process 方法从 main.py 文件移至 .core.thread_utils.py 文件,当我生成多个线程时,我就会收到以下错误:
...
File "/home/user/Projects/test/test-api/api/core/thread_utils.py", line 66, in launch_all_threads
p.start()
File "/usr/lib/python3.8/multiprocessing/process.py", line 121, in start
self._popen = self._Popen(self)
File "/usr/lib/python3.8/multiprocessing/context.py", line 224, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "/usr/lib/python3.8/multiprocessing/context.py", line 284, in _Popen
return Popen(process_obj)
File "/usr/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 32, in __init__
super().__init__(process_obj)
File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 19, in __init__
self._launch(process_obj)
File "/usr/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 47, in _launch
reduction.dump(process_obj, fp)
File "/usr/lib/python3.8/multiprocessing/reduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle 'weakref' object
因此,在 main.py 中运行良好的代码一旦放入单独的类中,现在就会抛出错误。另外,我可以毫无问题地生成 one 线程,但是 more 线程会抛出“无法 pickle”类型错误。我不明白为什么。
在对 Python 如何分叉新线程进行一些研究后,我相信我遇到了 这个错误。另外,多处理文档提到“安全导入”,但我不确定这是否/如何适用于这种情况,因为尝试应用它会产生相同的错误或根本没有线程。我可能做错了......
我使用的代码看起来像这样:
main.py:
from .core.thread_utils import thread_utils
thread_utils = thread_utils()
...
@app.on_event("startup") # this is part of FastAPI
async def startup_event():
# previous thread creation went here and worked perfectly
thread_utils.launch_all_threads()
核心/thread_utils.py:
from multiprocessing import Process
from ..core import busy_loop
class thread_utils:
_self = None
def __new__(cls):
if cls._self is None:
cls._self = super().__new__(cls)
return cls._self
def launch_runner_thread(self, t:int):
busy_loop(t)
def launch_all_threads(self):
t_list = [1,2]
for t in t_list:
p = Process(target=self.launch_runner_thread, args=(t,))
p.start()
任何关于如何在单独的类中实现多处理的帮助、指导或工作示例将非常感激!
从
self.
调用中删除 Process(target=self....)
,并将被调用的函数移到类之外。这应该可以解决类型错误。