使用ThreadPoolExecutor后如何清理线程本地数据?

问题描述 投票:0回答:1

我想使用

ThreadPoolExecutor
来并行化一些(遗留)代码与数据库访问。我想避免为每个线程创建一个新的数据库连接,因此我使用
threading.local()
来为每个线程保留一个连接,例如:

thread_local = threading.local()

def get_thread_db():
    if not hasattr(thread_local, "db"):
        thread_local.db = get_database_connection()
    return thread_local.db

def do_stuff():
    db_conn = get_thread_db()
    # ... 

with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
    futures = [executor.submit(do_stuff, *params) for params in data]
    # ...

如何保证所有提交的任务完成后,所有线程的连接都正确关闭?看起来

ThreadPoolExecutor
initializer
参数,但没有退出处理程序的参数。

python multithreading database-connection threadpool python-multithreading
1个回答
0
投票

清理线程本地存储时我遵循的一般模式是创建一个类,该类封装需要清理的数据(对象),并具有一个

__del__
方法,当线程本地数据被垃圾收集时将调用该方法。此方法将执行所需的所有清理工作。

根据您的具体情况,我会创建一个类,例如

DB_Connection
,将数据库连接封装为属性
_connection
,其
__del__
方法将关闭连接:

import concurrent.futures
import threading

thread_local = threading.local()

class DB_Connection:
    def __init__(self):
        self._connection = get_database_connection()
        
    def __del__(self):
        self._connection.close()
        
    @classmethod
    def get_thread_db(cls):
        if not hasattr(thread_local, "db"):
            thread_local.db = cls()
        return thread_local.db._connection
        

def do_stuff():
    db_conn = DB_Connection.get_thread_db()
    # ... 

with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
    futures = [executor.submit(do_stuff, *params) for params in data]
    # ...
© www.soinside.com 2019 - 2024. All rights reserved.