无法在类中pickle sqlAlchemy引擎

问题描述 投票:0回答:1

我想在我的自定义类中将多处理模块与 sqlAlchemy 一起使用。 这是代码:

from sqlalchemy import create_engine
engine = create_engine(f'mysql+pymysql://a:b@localhost:3306/', server_side_cursors=True, pool_size=20)


class Client(object):
    def __init__(self):
        self.engine = create_engine(f'mysql+pymysql://a:b@localhost:3306/', server_side_cursors=True, pool_size=20)
        self.pool = Pool(6)

    def run_in_process(self, x): 
        conn = self.engine.connect()
        print(conn)

    def run(self):
        x = 'x' 
        res = self.pool.apply_async(self.run_in_process, (x,))
        res.get()

    def __getstate__(self):
        self_dict = self.__dict__.copy()
        del self_dict['pool']
        return self_dict

    def __setstate__(self, state):
        self.__dict__.update(state)


pool = Pool(6)
client = Client()
client.run()

显示错误:

  File "test_pickle.py", line 32, in <module>
    client.run()
  File "test_pickle.py", line 19, in run
    res.get()
  File "/home/airflow/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/pool.py", line 657, in get
    raise self._value
  File "/home/airflow/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/pool.py", line 431, in _handle_tasks
    put(task)
  File "/home/airflow/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/home/airflow/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: can't pickle _thread._local objects

我知道由于pickle,多重处理有时会很麻烦,并且我知道这个问题是由于self.engine造成的,因为它无法被pickle。但我必须在类中使用

engine
这个变量。

那么,如何在我的示例中使引擎可供选择?

提前致谢。

python sqlalchemy multiprocessing pickle
1个回答
0
投票

我找不到序列化

SQLAlchemy engine
的方法。我认为不接触
SQLAlchemy
源代码是不可能的。

但是,我有两种将

SQLAlchemy
multiprocessing
一起使用的解决方案。

在每个子进程中创建

engine
,并确保使用后关闭连接。

from sqlalchemy import create_engine
from multiprocessing import Pool

class Client(object):
    def __init__(self):
         # Store the connection details
         self.url = 'mysql+pymysql://a:b@localhost:3306/'
         self.pool = Pool(6)

    def run_in_process(self, x):
         engine = create_engine(self.url)

         conn = engine.connect()
         print(conn)

         # Make sure to disconnect
         conn.close()
         engine.dispose()

    def run(self):
         x = 'x'
         res = self.pool.apply_async(self.run_in_process, (x,))
         res.get()

    def __getstate__(self):
         self_dict = self.__dict__.copy()
         del self_dict['pool']
         return self_dict

    def __setstate__(self, state):
         self.__dict__.update(state)

client = Client()
client.run()

或者在

engine
之外创建
class
,并通过将其设为
global
变量,使其可以从任何地方访问。

from sqlalchemy import create_engine
from multiprocessing import Pool

# Make the engine accessible from everywhere
global engine
engine = create_engine('mysql+pymysql://a:b@localhost:3306/')

class Client(object):
    def __init__(self):
        self.pool = Pool(6)

    def run_in_process(self, x):
        conn = engine.connect()
        print(conn)

    def run(self):
        x = 'x'
        res = self.pool.apply_async(self.run_in_process, (x,))
        res.get()

    def __getstate__(self):
        self_dict = self.__dict__.copy()
        del self_dict['pool']
        return self_dict

    def __setstate__(self, state):
        self.__dict__.update(state)

client = Client()
client.run()
© www.soinside.com 2019 - 2024. All rights reserved.