我想在我的自定义类中将多处理模块与 sqlAlchemy 一起使用。 这是代码:
from sqlalchemy import create_engine
engine = create_engine(f'mysql+pymysql://a:b@localhost:3306/', server_side_cursors=True, pool_size=20)
class Client(object):
def __init__(self):
self.engine = create_engine(f'mysql+pymysql://a:b@localhost:3306/', server_side_cursors=True, pool_size=20)
self.pool = Pool(6)
def run_in_process(self, x):
conn = self.engine.connect()
print(conn)
def run(self):
x = 'x'
res = self.pool.apply_async(self.run_in_process, (x,))
res.get()
def __getstate__(self):
self_dict = self.__dict__.copy()
del self_dict['pool']
return self_dict
def __setstate__(self, state):
self.__dict__.update(state)
pool = Pool(6)
client = Client()
client.run()
显示错误:
File "test_pickle.py", line 32, in <module>
client.run()
File "test_pickle.py", line 19, in run
res.get()
File "/home/airflow/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/pool.py", line 657, in get
raise self._value
File "/home/airflow/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/pool.py", line 431, in _handle_tasks
put(task)
File "/home/airflow/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/home/airflow/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: can't pickle _thread._local objects
我知道由于pickle,多重处理有时会很麻烦,并且我知道这个问题是由于self.engine造成的,因为它无法被pickle。但我必须在类中使用
engine
这个变量。
那么,如何在我的示例中使引擎可供选择?
提前致谢。
我找不到序列化
SQLAlchemy engine
的方法。我认为不接触 SQLAlchemy
源代码是不可能的。
但是,我有两种将
SQLAlchemy
与 multiprocessing
一起使用的解决方案。
在每个子进程中创建
engine
,并确保使用后关闭连接。
from sqlalchemy import create_engine
from multiprocessing import Pool
class Client(object):
def __init__(self):
# Store the connection details
self.url = 'mysql+pymysql://a:b@localhost:3306/'
self.pool = Pool(6)
def run_in_process(self, x):
engine = create_engine(self.url)
conn = engine.connect()
print(conn)
# Make sure to disconnect
conn.close()
engine.dispose()
def run(self):
x = 'x'
res = self.pool.apply_async(self.run_in_process, (x,))
res.get()
def __getstate__(self):
self_dict = self.__dict__.copy()
del self_dict['pool']
return self_dict
def __setstate__(self, state):
self.__dict__.update(state)
client = Client()
client.run()
或者在
engine
之外创建 class
,并通过将其设为 global
变量,使其可以从任何地方访问。
from sqlalchemy import create_engine
from multiprocessing import Pool
# Make the engine accessible from everywhere
global engine
engine = create_engine('mysql+pymysql://a:b@localhost:3306/')
class Client(object):
def __init__(self):
self.pool = Pool(6)
def run_in_process(self, x):
conn = engine.connect()
print(conn)
def run(self):
x = 'x'
res = self.pool.apply_async(self.run_in_process, (x,))
res.get()
def __getstate__(self):
self_dict = self.__dict__.copy()
del self_dict['pool']
return self_dict
def __setstate__(self, state):
self.__dict__.update(state)
client = Client()
client.run()