我尝试在fastapi项目中实现事务。 (路线 -> 服务层 -> 工作单元 -> 存储库)
这是我的UOW基地:
class BaseUnitOfWork(UnitOfWorkInterface):
def __init__(self, session_factory: Callable[[], AsyncSession]) -> None:
self._session_factory = session_factory
async def __aenter__(self) -> Self:
self.session = self._session_factory()
self._init_repos()
return self
@abstractmethod
def _init_repos(self) -> None:
"""
Usage:
- Inherit BaseUnitOfWork and define this method
- use method _register_repo to attach db session to you're repositories
Example:
from your_repositories import User, Profile
class MyUnitOfWork(BaseUnitOfWork):
def _init_repos(self):
self.users = self._register_repo(UserRepository)
self.profiles = self._register_repo(ProfileRepository)
"""
pass
def _register_repo(self, repo: type[R]) -> R:
return repo(self.session)
async def __aexit__(
self,
exception: type[BaseException] | None,
value: BaseException | None,
traceback: TracebackType | None,
) -> None:
if exception:
await self.session.rollback()
else:
await self.session.commit()
await self.session.close()
async def save(self):
await self.session.commit()
async def undo(self):
await self.session.rollback()
这是我的应用程序级 UOW:
class UnitOfWork(BaseUnitOfWork):
def _init_repos(self) -> None:
self.users = self._register_repo(UserDAL)
self.vocabularies = self._register_repo(VocabularySetDAL)
self.language_pairs = self._register_repo(LanguagePairDAL)
UOW 在 Service 层用作依赖:
class UserService:
def __init__(self, uow: UnitOfWork) -> None:
self._uow = uow
async def get_or_create_by_id(self, id: int) -> User:
async with self._uow as uow:
user = await uow.users.get_or_create(id=id)
return user
async def get_by_id(self, id: int) -> User | None:
async with self._uow as uow:
user = await uow.users.get_by_id(id)
return user
然后我通过以下方式创建服务组件:
users_service = UserService(UnitOfWork(async_session_maker))
vocabularies_service = VocabularyService(UnitOfWork(async_session_maker))
并通过将服务导入我的路线来使用它。
然后我调用我的常规 api 端点一切正常,但如果我打开我的 starlette 管理仪表板,我收到错误:
This session is provisioning a new connection; concurrent operations are not permitted
仅当我打开包含需要加载其他子模型的模型的编辑页面时,才会发生这种情况。
当我终止应用程序时,会显示进一步的错误:
ERROR:sqlalchemy.pool.impl.AsyncAdaptedQueuePool:The garbage collector is trying to clean up non-checked-in connection <AdaptedConnection <asyncpg.connection.Connection object at 0x112d2a020>>, which will be terminated. Please ensure that SQLAlchemy pooled connections are returned to the pool explicitly, either by calling ``close()`` or by using appropriate context managers to manage their lifecycle.
这似乎是我在 uow 上下文管理器(SQLAlchemy
close()
)中调用的 asyncio
方法中的问题,并且 uow 创建的会话可能与在后台创建的 starlete-admin 会话(Starlete anyio
)冲突。
我尝试将我的
close()
方法包装为 shield
异步方法:
async def __aexit__(
self,
exception: type[BaseException] | None,
value: BaseException | None,
traceback: TracebackType | None,
) -> None:
if exception:
await self.session.rollback()
else:
await self.session.commit()
await asyncio.shield(self.session.close())
但它并没有引起任何改变。
异常的完整信息:
ERROR: Exception in ASGI application
+ Exception Group Traceback (most recent call last):
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 108, in __call__
| response = await self.dispatch_func(request, call_next)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette_admin/contrib/sqla/middleware.py", line 24, in dispatch
| return await call_next(request)
| ^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 84, in call_next
| raise app_exc
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 70, in coro
| await self.app(scope, receive_or_disconnect, send_no_error)
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/sessions.py", line 86, in __call__
| await self.app(scope, receive, send_wrapper)
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 106, in __call__
| async with anyio.create_task_group() as task_group:
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 658, in __aexit__
| raise BaseExceptionGroup(
| ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 108, in __call__
| response = await self.dispatch_func(request, call_next)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette_admin/auth.py", line 277, in dispatch
| await self.provider.is_authenticated(request)
| File "/Users/apple/Desktop/proggraming/fastbot/app/backend/admin/auth.py", line 53, in is_authenticated
| current_user = await AuthService.get_user_from_token(token)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/apple/Desktop/proggraming/fastbot/app/backend/auth/auth.py", line 42, in get_user_from_token
| user = await users_service.get_by_id(user_id)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/apple/Desktop/proggraming/fastbot/app/backend/users/services.py", line 18, in get_by_id
| async with self._uow as uow:
| File "/Users/apple/Desktop/proggraming/fastbot/app/backend/db/unitofwork.py", line 71, in __aexit__
| await self.session.commit()
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/session.py", line 1011, in commit
| await greenlet_spawn(self.sync_session.commit)
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 179, in greenlet_spawn
| result = context.switch(*args, **kwargs)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1969, in commit
| trans.commit(_to_root=True)
| File "<string>", line 2, in commit
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/sqlalchemy/orm/state_changes.py", line 103, in _go
| self._raise_for_prerequisite_state(fn.__name__, current_state)
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 946, in _raise_for_prerequisite_state
| raise sa_exc.InvalidRequestError(
| sqlalchemy.exc.InvalidRequestError: This session is provisioning a new connection; concurrent operations are not permitted (Background on this error at: https://sqlalche.me/e/20/isce)
+------------------------------------
During handling of the above exception, another exception occurred:
+ Exception Group Traceback (most recent call last):
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/uvicorn/protocols/http/h11_impl.py", line 408, in run_asgi
| result = await app( # type: ignore[func-returns-value]
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py", line 84, in __call__
| return await self.app(scope, receive, send)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/fastapi/applications.py", line 292, in __call__
| await super().__call__(scope, receive, send)
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/applications.py", line 122, in __call__
| await self.middleware_stack(scope, receive, send)
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/errors.py", line 184, in __call__
| raise exc
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/errors.py", line 162, in __call__
| await self.app(scope, receive, _send)
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 79, in __call__
| raise exc
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 68, in __call__
| await self.app(scope, receive, sender)
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/fastapi/middleware/asyncexitstack.py", line 20, in __call__
| raise e
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/fastapi/middleware/asyncexitstack.py", line 17, in __call__
| await self.app(scope, receive, send)
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/routing.py", line 718, in __call__
| await route.handle(scope, receive, send)
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/routing.py", line 443, in handle
| await self.app(scope, receive, send)
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/applications.py", line 122, in __call__
| await self.middleware_stack(scope, receive, send)
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/errors.py", line 184, in __call__
| raise exc
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/errors.py", line 162, in __call__
| await self.app(scope, receive, _send)
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 106, in __call__
| async with anyio.create_task_group() as task_group:
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 658, in __aexit__
| raise BaseExceptionGroup(
| ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/anyio/streams/memory.py", line 97, in receive
| return self.receive_nowait()
| ^^^^^^^^^^^^^^^^^^^^^
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/anyio/streams/memory.py", line 92, in receive_nowait
| raise WouldBlock
| anyio.WouldBlock
|
| During handling of the above exception, another exception occurred:
|
| Traceback (most recent call last):
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 78, in call_next
| message = await recv_stream.receive()
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/anyio/streams/memory.py", line 112, in receive
| raise EndOfStream
| anyio.EndOfStream
|
| During handling of the above exception, another exception occurred:
|
| Exception Group Traceback (most recent call last):
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 108, in __call__
| response = await self.dispatch_func(request, call_next)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette_admin/contrib/sqla/middleware.py", line 24, in dispatch
| return await call_next(request)
| ^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 84, in call_next
| raise app_exc
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 70, in coro
| await self.app(scope, receive_or_disconnect, send_no_error)
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/sessions.py", line 86, in __call__
| await self.app(scope, receive, send_wrapper)
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 106, in __call__
| async with anyio.create_task_group() as task_group:
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 658, in __aexit__
| raise BaseExceptionGroup(
| ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 108, in __call__
| response = await self.dispatch_func(request, call_next)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette_admin/auth.py", line 277, in dispatch
| await self.provider.is_authenticated(request)
| File "/Users/apple/Desktop/proggraming/fastbot/app/backend/admin/auth.py", line 53, in is_authenticated
| current_user = await AuthService.get_user_from_token(token)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/apple/Desktop/proggraming/fastbot/app/backend/auth/auth.py", line 42, in get_user_from_token
| user = await users_service.get_by_id(user_id)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/apple/Desktop/proggraming/fastbot/app/backend/users/services.py", line 18, in get_by_id
| async with self._uow as uow:
| File "/Users/apple/Desktop/proggraming/fastbot/app/backend/db/unitofwork.py", line 71, in __aexit__
| await self.session.commit()
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/session.py", line 1011, in commit
| await greenlet_spawn(self.sync_session.commit)
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 179, in greenlet_spawn
| result = context.switch(*args, **kwargs)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1969, in commit
| trans.commit(_to_root=True)
| File "<string>", line 2, in commit
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/sqlalchemy/orm/state_changes.py", line 103, in _go
| self._raise_for_prerequisite_state(fn.__name__, current_state)
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 946, in _raise_for_prerequisite_state
| raise sa_exc.InvalidRequestError(
| sqlalchemy.exc.InvalidRequestError: This session is provisioning a new connection; concurrent operations are not permitted (Background on this error at: https://sqlalche.me/e/20/isce)
当我在并行运行的多个协程中使用单个
session
(单个uow
)时,我在相同的架构下遇到了同样的问题
出现此异常是因为并发访问同一个会话并且会话引用可以被覆盖
让我们重点关注这部分代码:
async def get_or_create_by_id(self, id: int) -> User:
async with self._uow as uow:
user = await uow.users.get_or_create(id=id)
...
想象两个用户同时更新一些信息。这意味着它们将共享相同的
users_service
-> 共享相同的 uow
-> 都将在同一个 UOW 对象上调用 BaseUnitOfWork.__aenter__
。如果有一定的机会,工作流程可能是下一个:
[coro-1] __aenter__, create and set new session#1
[coro-1] uow.users.get_or_create, using session#1
[coro-2] __aenter__, create and set new session#2 <-- now ref to session#1 is lost
[coro-2] uow.users.get_or_create, using session#2
[coro-1] __aexit__, using session#2
[coro-2] __aexit__, using session#2
...
session#1 is never committed and closed
也有可能两个协程都与
session#2
一起使用的情况
您可以为每个用户请求创建新的
uow
以避免出现此异常。 但请注意,在某些情况下这可能会导致更多的数据库连接和性能下降
对于 FastAPI,这将如下所示:
class BaseUnitOfWork(UnitOfWorkInterface):
def __init__(self, session: AsyncSession):
self.session = session
...
async def get_session() -> AsyncGenerator[AsyncSession, None]:
async with async_session_maker() as session:
yield session
async def get_uow(session: AsyncSession = Depends(get_session)) -> AsyncGenerator[UnitOfWork, None]:
async with UnitOfWork(session) as uow:
yield uow
def get_user_service(uow: UnitOfWork = Depends(get_uow)) -> UserService:
return UserService(uow)
@router.get('/users/{id}')
async def get_user(id: int, user_service: UserService = Depends(get_user_service)) -> User:
return await user_service.get_by_id(id)