ThreadPoolExecutor
来并行化一些(遗留)代码与数据库访问。我想避免为每个线程创建一个新的数据库连接,因此我使用 threading.local()
来为每个线程保留一个连接,例如:
thread_local = threading.local()
def get_thread_db():
if not hasattr(thread_local, "db"):
thread_local.db = get_database_connection()
return thread_local.db
def do_stuff():
db_conn = get_thread_db()
# ...
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(do_stuff, *params) for params in data]
# ...
如何保证所有提交的任务完成后,所有线程的连接都正确关闭?看起来
ThreadPoolExecutor
有 initializer
参数,但没有退出处理程序的参数。
清理线程本地存储时我遵循的一般模式是创建一个类,该类封装需要清理的数据(对象),并具有一个
__del__
方法,当线程本地数据被垃圾收集时将调用该方法。此方法将执行所需的所有清理工作。
根据您的具体情况,我会创建一个类,例如
DB_Connection
,将数据库连接封装为属性 _connection
,其 __del__
方法将关闭连接:
import concurrent.futures
import threading
thread_local = threading.local()
class DB_Connection:
def __init__(self):
self._connection = get_database_connection()
def __del__(self):
self._connection.close()
@classmethod
def get_thread_db(cls):
if not hasattr(thread_local, "db"):
thread_local.db = cls()
return thread_local.db._connection
def do_stuff():
db_conn = DB_Connection.get_thread_db()
# ...
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(do_stuff, *params) for params in data]
# ...