如何在 python 中使用数据库连接初始化工作进程池

问题描述 投票:0回答:1

我正在尝试设置一个工作进程池,其中每个进程都有一组数据库连接池

这是类定义


class MultiprocessingManager:
    _instance = None  # Class variable to hold the singleton instance

    def __new__(cls, local_ip, linked_ip, server_ip):
        """
            Ensures that MultiprocessingManager is only created once
        """
        if not hasattr(cls, '_instance') or cls._instance is None:
            cls._instance = super(MultiprocessingManager, cls).__new__(cls)
            cls._instance._initialized = False
        return cls._instance

    def __init__(self, local_ip, linked_ip, server_ip):
        if not self._initialized:
            print("Initializing MultiprocessingManager instance...")
            self.pool = Pool(
                processes=multiprocessing.cpu_count(),
                initializer=self._init_worker(local_ip=local_ip, linked_ip=linked_ip, server_ip=server_ip)
            )
            self._initialized = True
            print("Initializing MultiprocessingManager instance...")

这是尝试建立连接的

_init_worker
功能

    def _init_worker(self, local_ip, linked_ip, server_ip):
        """
            Used to initialize each PID worker with dedicated DBManagers
        """
        local_connection = DatabaseManager(
            host_ip=local_ip,
            host_username='something',
            db_password='something',
            db_name='something',
            db_ip=local_ip,
            db_port=3306,
            db_pool_name='local_db_pool',
            db_pool_size=5
        )
        linked_connection = DatabaseManager(
            host_ip=linked_ip,
            host_username='something',
            db_password='something',
            db_name='something',
            db_ip=linked_ip,
            db_port=3306,
            db_pool_name='linked_db_pool',
            db_pool_size=5
        )
        server_connection = DatabaseManager(
            host_ip=server_ip,
            host_username='something',
            db_password='something',
            db_name='something',
            db_ip=server_ip,
            db_port=3306,
            db_pool_name='server_db_pool',
            db_pool_size=5
        )

这就是我从 DatabaseManager 类中设置与 MySQL 的连接的方式

            print('DatabaseManager instantiation...')
            self.host_ip = host_ip
            self.host_username = host_username
            self.db_password = db_password
            self.db_name = db_name
            self.db_ip = db_ip
            self.db_port = db_port
            self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            self.socket.connect((self.db_ip, self.db_port))

            self.db_connection = mysql.connector.pooling.MySQLConnectionPool(
                pool_name=db_pool_name,
                pool_size=db_pool_size,
                pool_reset_session=True,
                host=self.host_ip,
                user=self.host_username,
                password=self.db_password,
                database=self.db_name)

关于多处理,正在做的是在单独的文件中创建工作函数,包含它们自己的数据库连接

def log_program_log_to_server(status_code: str, parameters: Optional[str], log_datetime: datetime):
        server_connection = DatabaseManager(
            host_ip=server_ip,
            host_username='something',
            db_password='something',
            db_name='something',
            db_ip=server_ip,
            db_port=3306,
            db_pool_name='server_db_pool',
            db_pool_size=5
        )
    server_connection.write_master_program_log_table(status_code, parameters, log_datetime)

这些函数在多处理上下文中使用,如下所示

    def log_status_code_to_server(self, status_code: StatusCodes, params: Optional[dict], timestamp: datetime):
        print("logging...")
        self.pool.apply_async(log_program_log_to_server, args=(status_code, params, timestamp))
        print("logged!")

但这并不理想,因为每次调用此函数时都会重新建立连接,因此需要发生的是具有多处理池的每个进程都应该初始化,并且已经包含自己的数据库连接。

所以我的问题是,如果我使用上面的

_init_worker
函数进行工作进程初始化,我到底如何使用这些连接池?

任何帮助表示赞赏!

python python-3.x multiprocessing mysql-connector
1个回答
0
投票

我要做的是创建一个池初始值设定项,将连接池定义为全局变量,例如:

def init_pool_processes(local_ip, linked_ip, server_ip):
    global local_connection, linked_connection, server_connection

    local_connection = DatabaseManager(
        ...
        db_ip=local_ip,
        ...
    )

    linked_connection = DatabaseManager(
        ...
        db_ip=linked_ip,
        ...
    )

    server_connection = DatabaseManager(
        ...
        db_ip=server_ip,
        ...
    )

那么你的

MultiprocessingManager.__init__
方法将如下所示:

            ...
            self.pool = Pool(
                processes=multiprocessing.cpu_count(),
                initializer=init_pool_processes(
                    local_ip=local_ip,
                    linked_ip=linked_ip,
                    server_ip=server_ip
                    )
            )
            ...

最后,你的工作函数变成:

def log_program_log_to_server(status_code: str, parameters: Optional[str], log_datetime: datetime):
    # server_connection is a global initialized by our pool initializer:
    server_connection.write_master_program_log_table(status_code, parameters, log_datetime)

但是我要问你的问题是:

为什么多处理池中的每个进程都需要一个连接池,因为

log_program_log_to_server
似乎只使用单个连接?因此池初始化函数
init_pool_processes
应该创建简单的连接而不是连接池。如果必须使用连接池,请将池的大小设置为 1。

© www.soinside.com 2019 - 2024. All rights reserved.