python 多处理 - 初始化/传递数据库连接以便跨进程使用的最佳方法

问题描述 投票:0回答:3

我在使用 Python 多处理包中的 pool.map 传递数据库连接对象或游标对象时遇到一些困难。基本上,我想创建一个工作池,每个工作池都有自己的状态和数据库连接,以便它们可以并行执行查询。

我已经尝试过这些方法,但我在 python 中遇到了 picklingerror -

带有 2 个参数的泳池地图

使用初始化器设置多进程池

第二个链接正是我需要做的,这意味着我希望每个进程在启动时打开一个数据库连接,然后使用该连接来处理传入的数据/参数。

这是我的代码。

import multiprocessing as mp

def process_data((id,db)):
  print 'in processdata'
  cursor = db.cursor()
  query = ....
  #cursor.execute(query)
  #....
  .....
  .....
  return row

`if __name__ == '__main__':

  db = getConnection() 
  cursor = db.cursor() 
  print 'Initialised db connection and cursor'
  inputs = [1,2,3,4,5]
  pool = mp.Pool(processes=2)
  result_list = pool.map(process_data,zip(inputs,repeat(db)))
  #print result_list
  pool.close()
  pool.join() 

`

这会导致以下错误 -

`Exception in thread Thread-1:
  Traceback (most recent call last):
  File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
    put(task)
  PicklingError: Can't pickle <type 'module'>: attribute lookup __builtin__.module failed`

我猜想根据python,db或游标对象是不可picklable的,因为如果我将repeat(db),替换为repeat(x),其中x是int或string,它就可以工作。 我尝试过使用初始化函数,它似乎有效,最初但当我执行查询时会发生奇怪的事情,当存在数据时,许多 id 不会返回任何内容。

实现这一目标的最佳方法是什么?我在 Linux 机器上使用 python 2.6.6。

python database multiprocessing pool initializer
3个回答
10
投票

我将继续将我的评论作为答案,因为我认为这是合适的。 您不想尝试将数据库连接从父进程传递到子进程。 您想要将静态数据或其他可以序列化的对象移动到您的子进程。 您可以传递数据行等。或者您希望您的孩子在必要时建立自己的数据库连接。


0
投票

虽然我同意我们应该避免将数据库连接从父进程传递到子进程,但最好确保子进程不会在每次函数调用时不必要地重新建立连接。

有很多方法可以做到这一点,但我发现最简单的方法是将有状态对象(例如数据库连接、boto3 客户端等)的初始化移动到您记忆的函数中。这避免了上面提到的问题,同时可以利用连接池等功能。

这是一个示例,演示每个工作人员在函数调用中使用相同的客户端:

import boto3
from functools import cache
from multiprocessing import Pool, current_process


@cache
def get_client():
    return boto3.client("glue")


def do_work(_):
    client = get_client()
    print(f"process {current_process()} has glue cilent {id(client)}")


def main():
    with Pool(processes=4) as pool:
        pool.map(do_work, range(12))


if __name__ == "__main__":
    main()

我希望这对其他人有帮助!


-2
投票

尝试腌制数据库连接对象。酸洗与工艺无关。所以它可能会起作用..

参考这些页面 - python pickle
泡菜示例

© www.soinside.com 2019 - 2024. All rights reserved.