如何在 Django 应用程序中安全地使用多处理?

问题描述 投票:0回答:1

我读过 docs ,建议多处理可能会在 Django 应用程序或 Windows 上导致意外的副作用,尤其是那些连接到多个数据库的应用程序。具体来说,我使用函数

load_to_table
从 DataFrame 创建多个 CSV 文件,然后使用多重处理将数据加载到 PostgreSQL 表中。该函数深度集成在我的 Django 应用程序中,而不是一个独立的脚本。

我担心如果在生产中使用此代码可能会产生潜在的长期影响。此外,

if __name__ == '__main__':
似乎不适用于 Django 的深层文件/函数。这是因为 Django 的管理命令是在不同的上下文中执行的,其中
__name__
未设置为
"__main__"
,这会阻止此块按预期执行。此外,多处理指南建议使用
if __name__ == '__main__':
安全地初始化多处理任务,因为它可以确保代码不会意外执行多次,特别是在 Windows 等平台上,在子进程中重新导入模块级代码。

这是我正在使用的代码:

import os
import glob
from multiprocessing import Pool, cpu_count
from functools import partial
from portal.db_postgresql.connection import Connection

def copy_to_table(file_name: str, table_name: str, columns: list):
    # custom connection class
    connection_obj = Connection(get_current_db_name(), 1, 1)
    connection = connection_obj.connection()
    cursor = connection.cursor()
    with open(file_name, "r") as f:
        cursor.copy_from(f, table_name, sep=",", columns=columns, null="")
    connection.commit()
    connection.close()
    return file_name

# df_ops is a custom PySpark dataframe class
def load_to_table(df_ops: PySparkOperations, table_name: str) -> dict:
    filepath = os.path.join("uploaded_files", table_name)
    df_ops.df.repartition(10).write.mode("overwrite").format("csv").option("header", "false").save(filepath)
    file_path_list = sorted(glob.glob(f"{filepath}/*.csv"))
    with Pool(cpu_count()) as p:
        p.map(partial(copy_to_table, table_name=table_name, columns=df_ops.df.columns), file_path_list)
    return df_ops.count

上面的函数不适用于 VS Code 调试器,很可能是因为

debugpy
,它会干扰 Django 的多处理。但是,它可以与
runserver
一起使用。当我使用 VS Code 调试器运行 Django 应用程序时,在执行函数时遇到以下error。它似乎在循环运行。

File "/usr/lib/python3.11/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
                  ^^^^^^^^^^^^^^^^^
  File "/home/rhythmflow/Desktop/Reconciliation/reconciliation-backend-v3/portal/operations/load_data/methods.py", line 225, in load_to_table
    with Pool(cpu_count()) as p:
         ^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/multiprocessing/context.py", line 281, in _Popen
    return Popen(process_obj)
           ^^^^^^^^^^^^^^^^^^
  File "/home/rhythmflow/Desktop/Reconciliation/reconciliation-backend-v3/portal/operations/load_data/load_data.py", line 71, in start
    load_to_table(df_ops, self.source_tmp_details)
  File "/usr/lib/python3.11/multiprocessing/context.py", line 119, in Pool
    return Pool(processes, initializer, initargs, maxtasksperchild,
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

File "/home/rhythmflow/.vscode/extensions/ms-python.debugpy-2024.10.0-linux-x64/bundled/libs/debugpy/_vendored/pydevd/pydevd.py", line 838, in wait_for_ready_to_run
    self._py_db_command_thread_event.wait(0.1)
  File "/usr/lib/python3.11/threading.py", line 629, in wait
    signaled = self._cond.wait(timeout)
               ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/threading.py", line 331, in wait
    gotit = waiter.acquire(True, timeout)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/rhythmflow/Desktop/Reconciliation/reconciliation-backend-v3/.venv/lib/python3.11/site-packages/django/utils/autoreload.py", line 664, in <lambda>
    signal.signal(signal.SIGTERM, lambda *args: sys.exit(0))
SystemExit: 0
[22/Aug/2024 15:04:30] "POST /start-process/ HTTP/1.1" 500 59
[22/Aug/2024 15:04:35,063] - Broken pipe from ('127.0.0.1', 51102)

可能导致此问题的原因是什么?如何在使用 VS Code 调试器时解决该问题?

python django debugging multiprocessing
1个回答
0
投票

在 Django 中使用纯多处理将导致其内部服务器在分叉/生成的进程中启动,并尝试将其自身与实际工作线程用于侦听 HTTP 传入请求的相同套接字资源绑定。

听起来够混乱的。因此,除非有一个 django-docs 官方页面告诉我们在整个过程中安全使用 Django 多处理的方法,否则我不会走这条路线。

但是,还有其他路线可以使用与面向 Web 的工作人员相同的代码库来进行紧密耦合过程,以同时运行 - 只需更改入口点,以便 django 不会尝试提供 HTTP 服务 - 我相信 django 管理脚本 - 一种在代码库中编写函数(通常从 CLI 调用)的方法,是能够连接到数据库和其他资源、使用模型类等的好方法,并使用 Django 本身在另一个进程中创建的其他数据库连接来执行此操作,知道它们位于其他进程中。

您应该使用

multiprocessing
 来调用您想要在进程外执行的任何操作,而不是使用 
subprocess.Popen
,因为它会隐式地重新加载或克隆您的 Python 应用程序(包括侦听 HTTP)一个管理脚本。调用它应该很简单,就像从 CLI 中所做的那样,传递它应作为命令行参数处理的模型实例的任何 ID。通信有点复杂(尽管您可以在数据库上创建一个特殊的模型,并使用该模型的实例在面向 Web 的进程和工作进程之间交换消息)

否则,我过去曾使用过 Celery 让进程外工作人员使用相同的 django 代码库。一旦你设置好它,它可能会更容易使用 - 工作人员必须作为 celery 配置的一部分单独启动 - 但除此之外,在这些工作人员中调用远程函数变得轻而易举。查找有关如何使用 celery 以及如何将 django 与 Celery 结合使用的文档

© www.soinside.com 2019 - 2024. All rights reserved.