如何使用可重用的异步函数或生成器干掉这个 psycopg 连接池样板代码?

问题描述 投票:0回答:1

我正在使用

psycopg
使用连接池连接到 PostgreSQL 数据库。它工作得很好,但是任何需要在事务中运行 SQL 的函数都会得到三层额外的嵌套:

/app/db.py

from os import getenv
from psycopg_pool import AsyncConnectionPool

pool = AsyncConnectionPool(getenv('POSTGRES_URL'))

/app/foo.py

from db import pool
from psycopg.rows import dict_row


async def create_foo(**kwargs):
    foo = {}
    async with pool.connection() as conn:
        async with conn.transaction():
            async with conn.cursor(row_factory=dict_row) as cur:
                # use cursor to execute SQL queries
    return foo


async def update_foo(foo_id, **kwargs):
    foo = {}
    async with pool.connection() as conn:
        async with conn.transaction():
            async with conn.cursor(row_factory=dict_row) as cur:
                # use cursor to execute SQL queries
    return foo

我想把它抽象成一个辅助函数,所以我尝试重构它:

/app/db.py

from contextlib import asynccontextmanager
from os import getenv
from psycopg_pool import AsyncConnectionPool

pool = AsyncConnectionPool(getenv('POSTGRES_URL'))


@asynccontextmanager
async def get_tx_cursor(**kwargs):
    async with pool.connection() as conn:
        conn.transaction()
        cur = conn.cursor(**kwargs)
        yield cur

...并这样称呼它:

/app/foo.py

from db import get_tx_cursor
from psycopg.rows import dict_row


async def create_foo(**kwargs):
    foo = {}
    async with get_tx_cursor(row_factory=dict_row) as cur:
        # use cursor to execute SQL queries
    return foo

...但这导致了一个错误:

TypeError: '_AsyncGeneratorContextManager' object does not support the context manager protocol

我也试过上面的变体,像这样:

async def get_tx_cursor(**kwargs):
    async with pool.connection() as conn:
        async with conn.transaction():
            async with conn.cursor(**kwargs) as cur:
                yield cur

...但得到了类似的结果,所以使用生成器似乎是不可能的。

有谁知道在不使用其他库的情况下将光标暴露给调用函数的简洁方法?

以下是我使用的版本:

  • 蟒蛇: 3.11
  • psycopg: 3.1.8
  • psycopg-池: 3.1.6
python python-asyncio generator psycopg2 psycopg3
1个回答
0
投票

你在

asynccontextmanager
装饰器的正确轨道上,但是你忘记了使用
async with
语句进行事务和
get_tx_cursor
函数内的游标。

这是您的

/app/db.py
的更正版本:

from contextlib import asynccontextmanager
from os import getenv
from psycopg_pool import AsyncConnectionPool

pool = AsyncConnectionPool(getenv('POSTGRES_URL'))


@asynccontextmanager
async def get_tx_cursor(**kwargs):
    async with pool.connection() as conn:
        async with conn.transaction():
            async with conn.cursor(**kwargs) as cur:
                yield cur

现在您可以按预期在

get_tx_cursor
中使用
/app/foo.py
功能:

from db import get_tx_cursor
from psycopg.rows import dict_row

async def create_foo(**kwargs):
    foo = {}
    async with get_tx_cursor(row_factory=dict_row) as cur:
        # use cursor to execute SQL queries
    return foo

这应该没有任何错误,您也可以在应用程序的其他部分重用

get_tx_cursor
函数。

© www.soinside.com 2019 - 2024. All rights reserved.