Psycopg3异步连接池泄漏/超时问题

问题描述 投票:0回答:1

我的 Psycopg3 异步连接池存在泄漏问题,但我找不到问题的根源。

我扩展了 Flask 应用程序,一旦服务器启动,它就会创建一个异步连接池。

然后,每当发出请求时,应用程序都会获取从数据库检索数据、执行查询、处理结果并向用户返回结果所需的连接。

我的实现工作了一段时间,但使用几次后,它就停止工作了。抛出的异常是

psycopg_pool.PoolTimeout

当我获取池统计信息时,我看到

pool_available = 0
,而
requests_queued
requests_errors
在每次新尝试中不断增加,表明可用连接已耗尽。

谁能帮我找到问题的根源吗?

这是实现:

  1. 可重用的连接池:
# db.py
import os
from psycopg_pool import AsyncConnectionPool

class DB:
    def __init__(self):
        self.async_pool = None

    def connect_async(self):
        conninfo = (f"dbname={os.environ['DBNAME']} "
                    f"user={os.environ['POSTGRES_USER']} "
                    f"password={os.environ['POSTGRES_PASSWORD']} "
                    f"host={os.environ['POSTGRES_HOST']} "
                    f"port={os.environ['POSTGRES_PORT']}")
        self.async_pool = AsyncConnectionPool(
            conninfo, min_size=4, max_size=10, timeout=15)

    def get_async_pool(self):
        if self.async_pool is None:
            self.connect_async()
        return self.async_pool

# extensions.py
from db import DB

my_db = DB()
  1. 查询执行:
# search.py
from extensions import my_db

async def keyword_search(query):
    async with my_db.get_async_pool().connection() as aconn:
        async with aconn.cursor() as cur:
            # query = ...  keyword search logic
            await cur.execute(query)
            documents = await cur.fetchall()
            column_names = [desc[0] for desc in cur.description]
    return [dict(zip(column_names, doc)) for doc in documents]

async def semantic_search(query):
    async with my_db.get_async_pool().connection() as aconn:
        async with aconn.cursor() as cur:
            # query = ...  semantic search logic
            await cur.execute(query)
            documents = await cur.fetchall()
            column_names = [desc[0] for desc in cur.description]
    return [dict(zip(column_names, doc)) for doc in documents]

async def full_search(query):
    ks, ss = await asyncio.gather(keyword_search(query), semantic_search(query))
    return ks + ss
  1. Flask 应用程序:
from extensions import my_db
from search import full_search

@bp.route('/search', methods=['GET'])
async def search_fn():
    query = request.args.get('query')
    for _ in range(2):
        try:
            results = await full_search(query)
            return jsonify(results)
        except psycopg_pool.PoolTimeout as e:
            stats = my_db.get_async_pool().get_stats()
            app.logger.error({'error': str(e), 'pool_stats': stats})
            
    return jsonify({'error': 'Database connection timeout'}), 500

其他可能相关的信息:

  • 应用程序中唯一的异步路由是上面的
    search_fn
    ;所有其他都是同步的。

提前致谢!

python postgresql flask psycopg3
1个回答
0
投票

异步上下文管理器中似乎存在错误。我做了一个修复,您可以在这篇文章中找到:Psycopg 连接池中的连接泄漏

[我还发布了另一个似乎性能更高的修复程序。]

© www.soinside.com 2019 - 2024. All rights reserved.