我在使用 Python 多处理包中的 pool.map 传递数据库连接对象或游标对象时遇到一些困难。基本上,我想创建一个工作池,每个工作池都有自己的状态和数据库连接,以便它们可以并行执行查询。
我已经尝试过这些方法,但我在 python 中遇到了 picklingerror -
第二个链接正是我需要做的,这意味着我希望每个进程在启动时打开一个数据库连接,然后使用该连接来处理传入的数据/参数。
这是我的代码。
import multiprocessing as mp
def process_data((id,db)):
print 'in processdata'
cursor = db.cursor()
query = ....
#cursor.execute(query)
#....
.....
.....
return row
`if __name__ == '__main__':
db = getConnection()
cursor = db.cursor()
print 'Initialised db connection and cursor'
inputs = [1,2,3,4,5]
pool = mp.Pool(processes=2)
result_list = pool.map(process_data,zip(inputs,repeat(db)))
#print result_list
pool.close()
pool.join()
`
这会导致以下错误 -
`Exception in thread Thread-1:
Traceback (most recent call last):
File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner
self.run()
File "/usr/lib/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'module'>: attribute lookup __builtin__.module failed`
我猜想根据python,db或游标对象是不可picklable的,因为如果我将repeat(db),替换为repeat(x),其中x是int或string,它就可以工作。 我尝试过使用初始化函数,它似乎有效,最初但当我执行查询时会发生奇怪的事情,当存在数据时,许多 id 不会返回任何内容。
实现这一目标的最佳方法是什么?我在 Linux 机器上使用 python 2.6.6。
我将继续将我的评论作为答案,因为我认为这是合适的。 您不想尝试将数据库连接从父进程传递到子进程。 您想要将静态数据或其他可以序列化的对象移动到您的子进程。 您可以传递数据行等。或者您希望您的孩子在必要时建立自己的数据库连接。
虽然我同意我们应该避免将数据库连接从父进程传递到子进程,但最好确保子进程不会在每次函数调用时不必要地重新建立连接。
有很多方法可以做到这一点,但我发现最简单的方法是将有状态对象(例如数据库连接、boto3 客户端等)的初始化移动到您记忆的函数中。这避免了上面提到的问题,同时可以利用连接池等功能。
这是一个示例,演示每个工作人员在函数调用中使用相同的客户端:
import boto3
from functools import cache
from multiprocessing import Pool, current_process
@cache
def get_client():
return boto3.client("glue")
def do_work(_):
client = get_client()
print(f"process {current_process()} has glue cilent {id(client)}")
def main():
with Pool(processes=4) as pool:
pool.map(do_work, range(12))
if __name__ == "__main__":
main()
我希望这对其他人有帮助!
尝试腌制数据库连接对象。酸洗与工艺无关。所以它可能会起作用..
参考这些页面 - python pickle
泡菜示例