在使用 Sanic 和 Asyncmy (MySQL) 查询期间与 MySQL 服务器失去连接

问题描述 投票:0回答:0

我正面临一个我很难确定的问题。

我制作了一个数据库上下文系统来将请求包装在

with
中,从而创建与 Mysql 的连接。这是完整的代码:

自定义/数据库/database.py

# -*- coding:utf-8 -*-

from sqlalchemy import exc, event
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession as SQLAlchemyAsyncSession
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.pool import Pool, QueuePool  # NullPool
from sqlalchemy.exc import OperationalError
from contextvars import ContextVar
from sanic import Sanic


class EngineNotInitialisedError(Exception):
    pass


class DBSessionContext:
    def __init__(self, read_session: Session, write_session: Session, commit_on_exit: bool = True) -> None:
        self.read_session = read_session
        self.write_session = write_session
        self.commit_on_exit = commit_on_exit

        self.token = None
        self._read = None
        self._write = None

    def _disable_flush(self, *args, **kwargs):
        raise NotImplementedError('Unable to flush a read-only session.')

    async def close(self, exc_type=None, exc_value=None, traceback=None):
        if self._write:
            if exc_value and getattr(exc_value, 'status_code', 500) > 300:
                await self._write.rollback()
            else:
                await self._write.commit()

            try:
                await self._write.close()
            except OperationalError as e:
                if e.orig.args[0] != 2013:  # Lost connection to MySQL server during query
                    raise e

        if self._read:
            try:
                await self._read.close()
            except OperationalError as e:
                if e.orig.args[0] != 2013:  # Lost connection to MySQL server during query
                    raise e

    def set_token(self, token):
        self.token = token

    @property
    def read(self) -> Session:
        if not self._read:
            self._read = self.read_session()
            self._read.flush = self._disable_flush

        return self._read

    @property
    def write(self) -> Session:
        if not self._write:
            self._write = self.write_session()

        return self._write


class AsyncSession(SQLAlchemyAsyncSession):
    async def execute(self, statement, **parameters):
        return await super().execute(statement, parameters)

    async def first(self, statement, **parameters):
        executed = await self.execute(statement, **parameters)
        return executed.first()

    async def all(self, statement, **parameters):
        executed = await self.execute(statement, **parameters)
        return executed.all()


class DBSession:
    def __init__(self):
        self.app = None
        self.read_engine = None
        self.read_session = None
        self.write_engine = None
        self.write_session = None
        self._session = None
        self.context = ContextVar("context", default=None)
        self.commit_on_exit = True

    def init_app(self, app: Sanic) -> None:
        self.app = app
        self.commit_on_exit = self.app.config.get('DATABASE_COMMIT_ON_EXIT', cast=bool, default=True)

        engine_args = {
            'echo': self.app.config.get('DATABASE_ECHO', cast=bool, default=False),
            'echo_pool': self.app.config.get('DATABASE_ECHO_POOL', cast=bool, default=False),
            'poolclass': QueuePool,  # will be used to create a connection pool instance using the connection parameters given in the URL
            # if pool_class is not NullPool:

            # if True will enable the connection pool “pre-ping” feature that tests connections for liveness upon each checkout
            'pool_pre_ping': self.app.config.get('DATABASE_POOL_PRE_PING', cast=bool, default=True),
            # the number of connections to allow in connection pool “overflow”
            'max_overflow': self.app.config.get('DATABASE_MAX_OVERFLOW', cast=int, default=10),
            # the number of connections to keep open inside the connection pool
            'pool_size': self.app.config.get('DATABASE_POOL_SIZE', cast=int, default=100),
            # this setting causes the pool to recycle connections after the given number of seconds has passed
            'pool_recycle': self.app.config.get('DATABASE_POOL_RECYCLE', cast=int, default=3600),
            # number of seconds to wait before giving up on getting a connection from the pool
            'pool_timeout': self.app.config.get('DATABASE_POOL_TIMEOUT', cast=int, default=5),
        }

        self.read_engine = create_async_engine(
            self.app.config.get('DATABASE_READ_URL'),
            connect_args={
                'connect_timeout': self.app.config.get('DATABASE_CONNECT_TIMEOUT', cast=int, default=3)
            },
            **engine_args
        )

        # @see https://writeonly.wordpress.com/2009/07/16/simple-read-only-sqlalchemy-sessions/
        self.read_session = sessionmaker(
            bind=self.read_engine,
            expire_on_commit=False,
            class_=AsyncSession,
            autoflush=False,
            autocommit=False
        )

        self.write_engine = create_async_engine(
            self.app.config.get('DATABASE_WRITE_URL'),
            connect_args={
                'connect_timeout': self.app.config.get('DATABASE_CONNECT_TIMEOUT', cast=int, default=3)
            },
            **engine_args
        )

        self.write_session = sessionmaker(
            bind=self.write_engine,
            expire_on_commit=False,
            class_=AsyncSession,
            autoflush=True
        )

    async def __aenter__(self):
        session_ctx = DBSessionContext(self.read_session, self.write_session, self.commit_on_exit)
        session_ctx.set_token(self.context.set(session_ctx))

        return session_ctx

    async def __aexit__(self, exc_type, exc_value, traceback):
        session_ctx = self.context.get()
        await session_ctx.close(exc_type, exc_value, traceback)

        self.context.reset(session_ctx.token)

    @property
    def read(self) -> Session:
        return self.context.get().read

    @property
    def write(self) -> Session:
        return self.context.get().write


@event.listens_for(Pool, "checkout")
def check_connection(dbapi_con, con_record, con_proxy):
    '''Listener for Pool checkout events that pings every connection before using.
    Implements pessimistic disconnect handling strategy. See also:
    http://docs.sqlalchemy.org/en/rel_0_8/core/pooling.html#disconnect-handling-pessimistic'''

    cursor = dbapi_con.cursor()
    try:
        cursor.execute("SELECT 1")
    except exc.OperationalError as ex:
        if ex.args[0] in (2006,   # MySQL server has gone away
                          2013,   # Lost connection to MySQL server during query
                          2055):  # Lost connection to MySQL server at '%s', system error: %d
            raise exc.DisconnectionError()  # caught by pool, which will retry with a new connection
        else:
            raise

    cursor.close()


db = DBSession()

使用起来很简单,我做了以下操作。在路由器中,我制作了一个包装器,它使用启动的数据库调用处理程序:

自定义/route.py

class Route:
    async def __call__(self, request: Request, **kwargs):
        async with db:
            response = await self.handler(*args)

            # process the response, such as chaning a str to a text response, etc

        return response

不幸的是,我注意到我有很多

(2013 年,“查询期间与 MySQL 服务器失去连接”)

我不知道这是怎么发生的,也不知道为什么会发生。这发生在相对较小的查询中(包含“LIMIT 1”,索引列应该很快)

这是完整的堆栈跟踪:

[2022-05-19 09:35:25 +0000] [92185] [ERROR] Exception occurred while handling uri: 'https://api.pdfshift.io/redacted'
Traceback (most recent call last):
  File "asyncmy/connection.pyx", line 610, in asyncmy.connection.Connection._read_bytes
    data = await self._reader.readexactly(num_bytes)
  File "/usr/lib/python3.9/asyncio/streams.py", line 721, in readexactly
    raise exceptions.IncompleteReadError(incomplete, n)
asyncio.exceptions.IncompleteReadError: 0 bytes read on a total of 4 expected bytes

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
    self.dialect.do_execute(
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 732, in do_execute
    cursor.execute(statement, parameters)
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/dialects/mysql/asyncmy.py", line 92, in execute
    return self.await_(self._execute_async(operation, parameters))
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 76, in await_only
    return current.driver.switch(awaitable)
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 129, in greenlet_spawn
    value = await result
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/dialects/mysql/asyncmy.py", line 104, in _execute_async
    result = await self._cursor.execute(operation, parameters)
  File "asyncmy/cursors.pyx", line 180, in execute
    result = await self._query(query)
  File "asyncmy/cursors.pyx", line 365, in _query
    await conn.query(q)
  File "asyncmy/connection.pyx", line 455, in query
    await self._read_query_result(unbuffered=unbuffered)
  File "asyncmy/connection.pyx", line 636, in _read_query_result
    await result.read()
  File "asyncmy/connection.pyx", line 1023, in read
    first_packet = await self.connection.read_packet()
  File "asyncmy/connection.pyx", line 578, in read_packet
    packet_header = await self._read_bytes(4)
  File "asyncmy/connection.pyx", line 618, in _read_bytes
    raise errors.OperationalError(CR_SERVER_LOST, msg) from e
asyncmy.errors.OperationalError: (2013, 'Lost connection to MySQL server during query')

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "handle_request", line 83, in handle_request
    )
  File "/var/www/project/www/custom/route.py", line 162, in __call__
    response = await response
  File "/var/www/project/www/apps/webhooks/views.py", line 104, in stripe
    await account.reset_usage()
  File "/var/www/project/www/apps/accounts/models.py", line 133, in reset_usage
    while await db.read.first(query, uuid=self.uuid):
  File "/var/www/project/www/custom/database/database.py", line 73, in first
    executed = await self.execute(statement, **parameters)
  File "/var/www/project/www/custom/database/database.py", line 70, in execute
    return await super().execute(statement, parameters)
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/ext/asyncio/session.py", line 211, in execute
    return await greenlet_spawn(
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 134, in greenlet_spawn
    result = context.throw(*sys.exc_info())
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 1692, in execute
    result = conn._execute_20(statement, params or {}, execution_options)
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1614, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/sql/elements.py", line 325, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1481, in _execute_clauseelement
    ret = self._execute_context(
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1845, in _execute_context
    self._handle_dbapi_exception(
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2026, in _handle_dbapi_exception
    util.raise_(
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
    raise exception
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
    self.dialect.do_execute(
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 732, in do_execute
    cursor.execute(statement, parameters)
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/dialects/mysql/asyncmy.py", line 92, in execute
    return self.await_(self._execute_async(operation, parameters))
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 76, in await_only
    return current.driver.switch(awaitable)
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 129, in greenlet_spawn
    value = await result
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/dialects/mysql/asyncmy.py", line 104, in _execute_async
    result = await self._cursor.execute(operation, parameters)
  File "asyncmy/cursors.pyx", line 180, in execute
    result = await self._query(query)
  File "asyncmy/cursors.pyx", line 365, in _query
    await conn.query(q)
  File "asyncmy/connection.pyx", line 455, in query
    await self._read_query_result(unbuffered=unbuffered)
  File "asyncmy/connection.pyx", line 636, in _read_query_result
    await result.read()
  File "asyncmy/connection.pyx", line 1023, in read
    first_packet = await self.connection.read_packet()
  File "asyncmy/connection.pyx", line 578, in read_packet
    packet_header = await self._read_bytes(4)
  File "asyncmy/connection.pyx", line 618, in _read_bytes
    raise errors.OperationalError(CR_SERVER_LOST, msg) from e
sqlalchemy.exc.OperationalError: (asyncmy.errors.OperationalError) (2013, 'Lost connection to MySQL server during query')
[SQL: SELECT id FROM conversions WHERE [redacted] LIMIT 1]
[parameters: ('redacted',)]
(Background on this error at: https://sqlalche.me/e/14/e3q8)

连接数据库时,这里是我提供的参数:

DATABASE_POOL_PRE_PING = True
DATABASE_MAX_OVERFLOW = 10
DATABASE_POOL_SIZE = 100
DATABASE_POOL_RECYCLE = 3600
DATABASE_POOL_TIMEOUT = 5
DATABASE_CONNECT_TIMEOUT = 3

(如果您需要 MySQL 服务器端的详细信息,请告诉我要运行哪个命令,我将在此处添加输出)。

我的假设是不知何故,退出

async with db
部分时连接没有正确关闭,所以当另一个请求进来时,使用相同的连接,但最终,MySQL将其杀死,导致上述错误
Lost connection to MySQL server during query

更多详情:

  1. 错误是一样的,但是queries变了,说明错误不是出在代码的特定部分,而是与连接有关
  2. 我在从 Stripe 发送 webhook 事件时发现了这个问题。 Stripe 返回的错误是“已过期”。这似乎表明在停止之前,连接挂起(可能等待 SQL 查询完成)
  3. 这并不是每次都会发生:对于同一个事件(Stripe),我能够成功运行一些 webhooks,而其他的则不能,所以再一次,与处理请求相关的代码似乎不是错误(但也许是关于数据库的管理方式)

谢谢你的帮助!

mysql asynchronous sqlalchemy sanic
© www.soinside.com 2019 - 2024. All rights reserved.