使用 SQLAlchemy UOW 和 starlette_admin 时出现“此会话正在配置新连接;不允许并发操作”错误

问题描述 投票:0回答:1

我尝试在fastapi项目中实现事务。 (路线 -> 服务层 -> 工作单元 -> 存储库)

这是我的UOW基地:

class BaseUnitOfWork(UnitOfWorkInterface):
    def __init__(self, session_factory: Callable[[], AsyncSession]) -> None:
        self._session_factory = session_factory

    async def __aenter__(self) -> Self:
        self.session = self._session_factory()
        self._init_repos()

        return self

    @abstractmethod
    def _init_repos(self) -> None:
        """
        Usage:
            - Inherit BaseUnitOfWork and define this method
            - use method _register_repo to attach db session to you're repositories
        Example:
            from your_repositories import User, Profile

            class MyUnitOfWork(BaseUnitOfWork):
                def _init_repos(self):
                    self.users = self._register_repo(UserRepository)
                    self.profiles = self._register_repo(ProfileRepository)
        """
        pass

    def _register_repo(self, repo: type[R]) -> R:
        return repo(self.session)
        
    async def __aexit__(
        self,
        exception: type[BaseException] | None,
        value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        if exception:
            await self.session.rollback()
        else:
            await self.session.commit()

        await self.session.close()

    async def save(self):
        await self.session.commit()

    async def undo(self):
        await self.session.rollback()

这是我的应用程序级 UOW:

class UnitOfWork(BaseUnitOfWork):
    def _init_repos(self) -> None:
        self.users = self._register_repo(UserDAL)
        self.vocabularies = self._register_repo(VocabularySetDAL)
        self.language_pairs = self._register_repo(LanguagePairDAL)

UOW 在 Service 层用作依赖:

class UserService:
    def __init__(self, uow: UnitOfWork) -> None:
        self._uow = uow

    async def get_or_create_by_id(self, id: int) -> User:
        async with self._uow as uow:
            user = await uow.users.get_or_create(id=id)

        return user
    
    async def get_by_id(self, id: int) -> User | None:
        async with self._uow as uow:
            user = await uow.users.get_by_id(id)

        return user

然后我通过以下方式创建服务组件:

users_service = UserService(UnitOfWork(async_session_maker))
vocabularies_service = VocabularyService(UnitOfWork(async_session_maker))

并通过将服务导入我的路线来使用它。

然后我调用我的常规 api 端点一切正常,但如果我打开我的 starlette 管理仪表板,我收到错误:

This session is provisioning a new connection; concurrent operations are not permitted
仅当我打开包含需要加载其他子模型的模型的编辑页面时,才会发生这种情况。

当我终止应用程序时,会显示进一步的错误:

ERROR:sqlalchemy.pool.impl.AsyncAdaptedQueuePool:The garbage collector is trying to clean up non-checked-in connection <AdaptedConnection <asyncpg.connection.Connection object at 0x112d2a020>>, which will be terminated.  Please ensure that SQLAlchemy pooled connections are returned to the pool explicitly, either by calling ``close()`` or by using appropriate context managers to manage their lifecycle.

这似乎是我在 uow 上下文管理器(SQLAlchemy

close()
)中调用的
asyncio
方法中的问题,并且 uow 创建的会话可能与在后台创建的 starlete-admin 会话(Starlete
anyio
)冲突。

我尝试将我的

close()
方法包装为
shield
异步方法:

    async def __aexit__(
        self,
        exception: type[BaseException] | None,
        value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        if exception:
            await self.session.rollback()
        else:
            await self.session.commit()

        await asyncio.shield(self.session.close())

但它并没有引起任何改变。

异常的完整信息:

ERROR:    Exception in ASGI application
  + Exception Group Traceback (most recent call last):
  |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 108, in __call__
  |     response = await self.dispatch_func(request, call_next)
  |                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette_admin/contrib/sqla/middleware.py", line 24, in dispatch
  |     return await call_next(request)
  |            ^^^^^^^^^^^^^^^^^^^^^^^^
  |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 84, in call_next
  |     raise app_exc
  |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 70, in coro
  |     await self.app(scope, receive_or_disconnect, send_no_error)
  |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/sessions.py", line 86, in __call__
  |     await self.app(scope, receive, send_wrapper)
  |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 106, in __call__
  |     async with anyio.create_task_group() as task_group:
  |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 658, in __aexit__
  |     raise BaseExceptionGroup(
  | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 108, in __call__
    |     response = await self.dispatch_func(request, call_next)
    |                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette_admin/auth.py", line 277, in dispatch
    |     await self.provider.is_authenticated(request)
    |   File "/Users/apple/Desktop/proggraming/fastbot/app/backend/admin/auth.py", line 53, in is_authenticated
    |     current_user = await AuthService.get_user_from_token(token)
    |                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/Users/apple/Desktop/proggraming/fastbot/app/backend/auth/auth.py", line 42, in get_user_from_token
    |     user = await users_service.get_by_id(user_id)
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/Users/apple/Desktop/proggraming/fastbot/app/backend/users/services.py", line 18, in get_by_id
    |     async with self._uow as uow:
    |   File "/Users/apple/Desktop/proggraming/fastbot/app/backend/db/unitofwork.py", line 71, in __aexit__
    |     await self.session.commit()
    |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/session.py", line 1011, in commit
    |     await greenlet_spawn(self.sync_session.commit)
    |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 179, in greenlet_spawn
    |     result = context.switch(*args, **kwargs)
    |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1969, in commit
    |     trans.commit(_to_root=True)
    |   File "<string>", line 2, in commit
    |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/sqlalchemy/orm/state_changes.py", line 103, in _go
    |     self._raise_for_prerequisite_state(fn.__name__, current_state)
    |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 946, in _raise_for_prerequisite_state
    |     raise sa_exc.InvalidRequestError(
    | sqlalchemy.exc.InvalidRequestError: This session is provisioning a new connection; concurrent operations are not permitted (Background on this error at: https://sqlalche.me/e/20/isce)
    +------------------------------------

During handling of the above exception, another exception occurred:

  + Exception Group Traceback (most recent call last):
  |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/uvicorn/protocols/http/h11_impl.py", line 408, in run_asgi
  |     result = await app(  # type: ignore[func-returns-value]
  |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py", line 84, in __call__
  |     return await self.app(scope, receive, send)
  |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/fastapi/applications.py", line 292, in __call__
  |     await super().__call__(scope, receive, send)
  |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/applications.py", line 122, in __call__
  |     await self.middleware_stack(scope, receive, send)
  |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/errors.py", line 184, in __call__
  |     raise exc
  |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/errors.py", line 162, in __call__
  |     await self.app(scope, receive, _send)
  |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 79, in __call__
  |     raise exc
  |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 68, in __call__
  |     await self.app(scope, receive, sender)
  |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/fastapi/middleware/asyncexitstack.py", line 20, in __call__
  |     raise e
  |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/fastapi/middleware/asyncexitstack.py", line 17, in __call__
  |     await self.app(scope, receive, send)
  |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/routing.py", line 718, in __call__
  |     await route.handle(scope, receive, send)
  |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/routing.py", line 443, in handle
  |     await self.app(scope, receive, send)
  |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/applications.py", line 122, in __call__
  |     await self.middleware_stack(scope, receive, send)
  |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/errors.py", line 184, in __call__
  |     raise exc
  |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/errors.py", line 162, in __call__
  |     await self.app(scope, receive, _send)
  |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 106, in __call__
  |     async with anyio.create_task_group() as task_group:
  |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 658, in __aexit__
  |     raise BaseExceptionGroup(
  | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/anyio/streams/memory.py", line 97, in receive
    |     return self.receive_nowait()
    |            ^^^^^^^^^^^^^^^^^^^^^
    |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/anyio/streams/memory.py", line 92, in receive_nowait
    |     raise WouldBlock
    | anyio.WouldBlock
    | 
    | During handling of the above exception, another exception occurred:
    | 
    | Traceback (most recent call last):
    |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 78, in call_next
    |     message = await recv_stream.receive()
    |               ^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/anyio/streams/memory.py", line 112, in receive
    |     raise EndOfStream
    | anyio.EndOfStream
    | 
    | During handling of the above exception, another exception occurred:
    | 
    | Exception Group Traceback (most recent call last):
    |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 108, in __call__
    |     response = await self.dispatch_func(request, call_next)
    |                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette_admin/contrib/sqla/middleware.py", line 24, in dispatch
    |     return await call_next(request)
    |            ^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 84, in call_next
    |     raise app_exc
    |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 70, in coro
    |     await self.app(scope, receive_or_disconnect, send_no_error)
    |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/sessions.py", line 86, in __call__
    |     await self.app(scope, receive, send_wrapper)
    |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 106, in __call__
    |     async with anyio.create_task_group() as task_group:
    |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 658, in __aexit__
    |     raise BaseExceptionGroup(
    | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
    +-+---------------- 1 ----------------
      | Traceback (most recent call last):
      |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 108, in __call__
      |     response = await self.dispatch_func(request, call_next)
      |                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette_admin/auth.py", line 277, in dispatch
      |     await self.provider.is_authenticated(request)
      |   File "/Users/apple/Desktop/proggraming/fastbot/app/backend/admin/auth.py", line 53, in is_authenticated
      |     current_user = await AuthService.get_user_from_token(token)
      |                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      |   File "/Users/apple/Desktop/proggraming/fastbot/app/backend/auth/auth.py", line 42, in get_user_from_token
      |     user = await users_service.get_by_id(user_id)
      |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      |   File "/Users/apple/Desktop/proggraming/fastbot/app/backend/users/services.py", line 18, in get_by_id
      |     async with self._uow as uow:
      |   File "/Users/apple/Desktop/proggraming/fastbot/app/backend/db/unitofwork.py", line 71, in __aexit__
      |     await self.session.commit()
      |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/session.py", line 1011, in commit
      |     await greenlet_spawn(self.sync_session.commit)
      |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 179, in greenlet_spawn
      |     result = context.switch(*args, **kwargs)
      |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1969, in commit
      |     trans.commit(_to_root=True)
      |   File "<string>", line 2, in commit
      |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/sqlalchemy/orm/state_changes.py", line 103, in _go
      |     self._raise_for_prerequisite_state(fn.__name__, current_state)
      |   File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 946, in _raise_for_prerequisite_state
      |     raise sa_exc.InvalidRequestError(
      | sqlalchemy.exc.InvalidRequestError: This session is provisioning a new connection; concurrent operations are not permitted (Background on this error at: https://sqlalche.me/e/20/isce)
sqlalchemy fastapi unit-of-work starlette
1个回答
0
投票

当我在并行运行的多个协程中使用单个

session
(单个
uow
)时,我在相同的架构下遇到了同样的问题

原因

出现此异常是因为并发访问同一个会话并且会话引用可以被覆盖

让我们重点关注这部分代码:

async def get_or_create_by_id(self, id: int) -> User:
    async with self._uow as uow:
        user = await uow.users.get_or_create(id=id)
    ...

想象两个用户同时更新一些信息。这意味着它们将共享相同的

users_service
-> 共享相同的
uow
-> 都将在同一个 UOW 对象上调用
BaseUnitOfWork.__aenter__
。如果有一定的机会,工作流程可能是下一个:

[coro-1] __aenter__, create and set new session#1
[coro-1] uow.users.get_or_create, using session#1
[coro-2] __aenter__, create and set new session#2  <-- now ref to session#1 is lost
[coro-2] uow.users.get_or_create, using session#2
[coro-1] __aexit__, using session#2
[coro-2] __aexit__, using session#2
...
session#1 is never committed and closed

也有可能两个协程都与

session#2

一起使用的情况

快速解决方案:

您可以为每个用户请求创建新的

uow
以避免出现此异常。 但请注意,在某些情况下这可能会导致更多的数据库连接和性能下降

对于 FastAPI,这将如下所示:

class BaseUnitOfWork(UnitOfWorkInterface):
    def __init__(self, session: AsyncSession):
        self.session = session
        ...

async def get_session() -> AsyncGenerator[AsyncSession, None]:
    async with async_session_maker() as session:
        yield session

async def get_uow(session: AsyncSession = Depends(get_session)) -> AsyncGenerator[UnitOfWork, None]:
    async with UnitOfWork(session) as uow:
        yield uow

def get_user_service(uow: UnitOfWork = Depends(get_uow)) -> UserService:
    return UserService(uow)

@router.get('/users/{id}')
async def get_user(id: int, user_service: UserService = Depends(get_user_service)) -> User:
    return await user_service.get_by_id(id)
© www.soinside.com 2019 - 2024. All rights reserved.