我正在使用
psycopg
使用连接池连接到 PostgreSQL 数据库。它工作得很好,但是任何需要在事务中运行 SQL 的函数都会得到三层额外的嵌套:
/app/db.py
from os import getenv
from psycopg_pool import AsyncConnectionPool
pool = AsyncConnectionPool(getenv('POSTGRES_URL'))
/app/foo.py
from db import pool
from psycopg.rows import dict_row
async def create_foo(**kwargs):
foo = {}
async with pool.connection() as conn:
async with conn.transaction():
async with conn.cursor(row_factory=dict_row) as cur:
# use cursor to execute SQL queries
return foo
async def update_foo(foo_id, **kwargs):
foo = {}
async with pool.connection() as conn:
async with conn.transaction():
async with conn.cursor(row_factory=dict_row) as cur:
# use cursor to execute SQL queries
return foo
我想把它抽象成一个辅助函数,所以我尝试重构它:
/app/db.py
from contextlib import asynccontextmanager
from os import getenv
from psycopg_pool import AsyncConnectionPool
pool = AsyncConnectionPool(getenv('POSTGRES_URL'))
@asynccontextmanager
async def get_tx_cursor(**kwargs):
async with pool.connection() as conn:
conn.transaction()
cur = conn.cursor(**kwargs)
yield cur
...并这样称呼它:
/app/foo.py
from db import get_tx_cursor
from psycopg.rows import dict_row
async def create_foo(**kwargs):
foo = {}
async with get_tx_cursor(row_factory=dict_row) as cur:
# use cursor to execute SQL queries
return foo
...但这导致了一个错误:
TypeError: '_AsyncGeneratorContextManager' object does not support the context manager protocol
我也试过上面的变体,像这样:
async def get_tx_cursor(**kwargs):
async with pool.connection() as conn:
async with conn.transaction():
async with conn.cursor(**kwargs) as cur:
yield cur
...但得到了类似的结果,所以使用生成器似乎是不可能的。
有谁知道在不使用其他库的情况下将光标暴露给调用函数的简洁方法?
以下是我使用的版本:
你在
asynccontextmanager
装饰器的正确轨道上,但是你忘记了使用 async with
语句进行事务和 get_tx_cursor
函数内的游标。
这是您的
/app/db.py
的更正版本:
from contextlib import asynccontextmanager
from os import getenv
from psycopg_pool import AsyncConnectionPool
pool = AsyncConnectionPool(getenv('POSTGRES_URL'))
@asynccontextmanager
async def get_tx_cursor(**kwargs):
async with pool.connection() as conn:
async with conn.transaction():
async with conn.cursor(**kwargs) as cur:
yield cur
现在您可以按预期在
get_tx_cursor
中使用/app/foo.py
功能:
from db import get_tx_cursor
from psycopg.rows import dict_row
async def create_foo(**kwargs):
foo = {}
async with get_tx_cursor(row_factory=dict_row) as cur:
# use cursor to execute SQL queries
return foo
这应该没有任何错误,您也可以在应用程序的其他部分重用
get_tx_cursor
函数。