使用下面的代码,如果不重置 ContextVar 中设置的 SqlAlchemy 会话,会发生什么情况?
from contextvars import ContextVar
from fastapi import FastAPI, BackgroundTasks
from sqlalchemy import text
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
engine = create_async_engine("postgresql+asyncpg://postgres:postgres@localhost:5433/postgres", isolation_level="AUTOCOMMIT")
session_context = ContextVar("session")
app = FastAPI()
class Repository:
async def get_one(self):
return await self.execute(text("select 1"))
async def get_two(self):
return await self.execute(text("select 2"))
async def execute(self, statement):
try:
session = session_context.get()
except LookupError:
session = AsyncSession(engine)
session_context.set(session)
print(session)
result = (await session.execute(statement)).scalar()
await session.close() # for some reason I need to close session every time
return result
async def check_connections_statuses():
print(engine.pool.status())
print(session_context.get())
@app.get("/")
async def main(background_tasks: BackgroundTasks):
repo = Repository()
print(await repo.get_one())
print(await repo.get_two())
background_tasks.add_task(check_connections_statuses)
代码重现输出:
<sqlalchemy.ext.asyncio.session.AsyncSession object at 0x121bb31a0>
1
<sqlalchemy.ext.asyncio.session.AsyncSession object at 0x121bb31a0>
2
INFO: 127.0.0.1:59836 - "GET / HTTP/1.1" 200 OK
Pool size: 5 Connections in pool: 1 Current Overflow: -4 Current Checked out connections: 0
<sqlalchemy.ext.asyncio.session.AsyncSession object at 0x121bb31a0>
看来连接没有泄漏。但是 ContextVar 中的会话对象呢?正如你所看到的,它仍然在 ContextVar 中。难道是保存不重置吗?或者 Python 以某种方式重置它?
是的,当前 HTTP 视图完成后就会重置。在上面的示例中,当协同例程
main
返回时,就会发生这种情况。
Contextvars 添加到消息中,以便并发异步任务可以“查看”独立的上下文信息,即使任务运行相同的函数(想想并行运行您的
main
视图的多个 HTTP 请求)。在 asyncio 中创建并发任务的行为(FastAPI 在调用 main
之前为您执行)会自动分叉上下文,并且该任务将在新上下文中运行。任务完成后,分叉的上下文将被释放(除非在任务处于活动状态时使用 contextvars.copy_context()
调用等方式显式保存)