我正在尝试设置一个工作进程池,其中每个进程都有一组数据库连接池
这是类定义
class MultiprocessingManager:
_instance = None # Class variable to hold the singleton instance
def __new__(cls, local_ip, linked_ip, server_ip):
"""
Ensures that MultiprocessingManager is only created once
"""
if not hasattr(cls, '_instance') or cls._instance is None:
cls._instance = super(MultiprocessingManager, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self, local_ip, linked_ip, server_ip):
if not self._initialized:
print("Initializing MultiprocessingManager instance...")
self.pool = Pool(
processes=multiprocessing.cpu_count(),
initializer=self._init_worker(local_ip=local_ip, linked_ip=linked_ip, server_ip=server_ip)
)
self._initialized = True
print("Initializing MultiprocessingManager instance...")
这是尝试建立连接的
_init_worker
功能
def _init_worker(self, local_ip, linked_ip, server_ip):
"""
Used to initialize each PID worker with dedicated DBManagers
"""
local_connection = DatabaseManager(
host_ip=local_ip,
host_username='something',
db_password='something',
db_name='something',
db_ip=local_ip,
db_port=3306,
db_pool_name='local_db_pool',
db_pool_size=5
)
linked_connection = DatabaseManager(
host_ip=linked_ip,
host_username='something',
db_password='something',
db_name='something',
db_ip=linked_ip,
db_port=3306,
db_pool_name='linked_db_pool',
db_pool_size=5
)
server_connection = DatabaseManager(
host_ip=server_ip,
host_username='something',
db_password='something',
db_name='something',
db_ip=server_ip,
db_port=3306,
db_pool_name='server_db_pool',
db_pool_size=5
)
这就是我从 DatabaseManager 类中设置与 MySQL 的连接的方式
print('DatabaseManager instantiation...')
self.host_ip = host_ip
self.host_username = host_username
self.db_password = db_password
self.db_name = db_name
self.db_ip = db_ip
self.db_port = db_port
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.connect((self.db_ip, self.db_port))
self.db_connection = mysql.connector.pooling.MySQLConnectionPool(
pool_name=db_pool_name,
pool_size=db_pool_size,
pool_reset_session=True,
host=self.host_ip,
user=self.host_username,
password=self.db_password,
database=self.db_name)
关于多处理,正在做的是在单独的文件中创建工作函数,包含它们自己的数据库连接
def log_program_log_to_server(status_code: str, parameters: Optional[str], log_datetime: datetime):
server_connection = DatabaseManager(
host_ip=server_ip,
host_username='something',
db_password='something',
db_name='something',
db_ip=server_ip,
db_port=3306,
db_pool_name='server_db_pool',
db_pool_size=5
)
server_connection.write_master_program_log_table(status_code, parameters, log_datetime)
这些函数在多处理上下文中使用,如下所示
def log_status_code_to_server(self, status_code: StatusCodes, params: Optional[dict], timestamp: datetime):
print("logging...")
self.pool.apply_async(log_program_log_to_server, args=(status_code, params, timestamp))
print("logged!")
但这并不理想,因为每次调用此函数时都会重新建立连接,因此需要发生的是具有多处理池的每个进程都应该初始化,并且已经包含自己的数据库连接。
所以我的问题是,如果我使用上面的
_init_worker
函数进行工作进程初始化,我到底如何使用这些连接池?
任何帮助表示赞赏!
我要做的是创建一个池初始值设定项,将连接池定义为全局变量,例如:
def init_pool_processes(local_ip, linked_ip, server_ip):
global local_connection, linked_connection, server_connection
local_connection = DatabaseManager(
...
db_ip=local_ip,
...
)
linked_connection = DatabaseManager(
...
db_ip=linked_ip,
...
)
server_connection = DatabaseManager(
...
db_ip=server_ip,
...
)
那么你的
MultiprocessingManager.__init__
方法将如下所示:
...
self.pool = Pool(
processes=multiprocessing.cpu_count(),
initializer=init_pool_processes(
local_ip=local_ip,
linked_ip=linked_ip,
server_ip=server_ip
)
)
...
最后,你的工作函数变成:
def log_program_log_to_server(status_code: str, parameters: Optional[str], log_datetime: datetime):
# server_connection is a global initialized by our pool initializer:
server_connection.write_master_program_log_table(status_code, parameters, log_datetime)
为什么多处理池中的每个进程都需要一个连接池,因为
log_program_log_to_server
似乎只使用单个连接?因此池初始化函数init_pool_processes
应该创建简单的连接而不是连接池。如果必须使用连接池,请将池的大小设置为 1。