我的 Psycopg3 异步连接池存在泄漏问题,但我找不到问题的根源。
我扩展了 Flask 应用程序,一旦服务器启动,它就会创建一个异步连接池。
然后,每当发出请求时,应用程序都会获取从数据库检索数据、执行查询、处理结果并向用户返回结果所需的连接。
我的实现工作了一段时间,但使用几次后,它就停止工作了。抛出的异常是
psycopg_pool.PoolTimeout
。
当我获取池统计信息时,我看到
pool_available = 0
,而 requests_queued
和 requests_errors
在每次新尝试中不断增加,表明可用连接已耗尽。
谁能帮我找到问题的根源吗?
这是实现:
# db.py
import os
from psycopg_pool import AsyncConnectionPool
class DB:
def __init__(self):
self.async_pool = None
def connect_async(self):
conninfo = (f"dbname={os.environ['DBNAME']} "
f"user={os.environ['POSTGRES_USER']} "
f"password={os.environ['POSTGRES_PASSWORD']} "
f"host={os.environ['POSTGRES_HOST']} "
f"port={os.environ['POSTGRES_PORT']}")
self.async_pool = AsyncConnectionPool(
conninfo, min_size=4, max_size=10, timeout=15)
def get_async_pool(self):
if self.async_pool is None:
self.connect_async()
return self.async_pool
# extensions.py
from db import DB
my_db = DB()
# search.py
from extensions import my_db
async def keyword_search(query):
async with my_db.get_async_pool().connection() as aconn:
async with aconn.cursor() as cur:
# query = ... keyword search logic
await cur.execute(query)
documents = await cur.fetchall()
column_names = [desc[0] for desc in cur.description]
return [dict(zip(column_names, doc)) for doc in documents]
async def semantic_search(query):
async with my_db.get_async_pool().connection() as aconn:
async with aconn.cursor() as cur:
# query = ... semantic search logic
await cur.execute(query)
documents = await cur.fetchall()
column_names = [desc[0] for desc in cur.description]
return [dict(zip(column_names, doc)) for doc in documents]
async def full_search(query):
ks, ss = await asyncio.gather(keyword_search(query), semantic_search(query))
return ks + ss
from extensions import my_db
from search import full_search
@bp.route('/search', methods=['GET'])
async def search_fn():
query = request.args.get('query')
for _ in range(2):
try:
results = await full_search(query)
return jsonify(results)
except psycopg_pool.PoolTimeout as e:
stats = my_db.get_async_pool().get_stats()
app.logger.error({'error': str(e), 'pool_stats': stats})
return jsonify({'error': 'Database connection timeout'}), 500
其他可能相关的信息:
search_fn
;所有其他都是同步的。提前致谢!
异步上下文管理器中似乎存在错误。我做了一个修复,您可以在这篇文章中找到:Psycopg 连接池中的连接泄漏
[我还发布了另一个似乎性能更高的修复程序。]