我正面临一个我很难确定的问题。
我制作了一个数据库上下文系统来将请求包装在
with
中,从而创建与 Mysql 的连接。这是完整的代码:
自定义/数据库/database.py
# -*- coding:utf-8 -*-
from sqlalchemy import exc, event
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession as SQLAlchemyAsyncSession
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.pool import Pool, QueuePool # NullPool
from sqlalchemy.exc import OperationalError
from contextvars import ContextVar
from sanic import Sanic
class EngineNotInitialisedError(Exception):
pass
class DBSessionContext:
def __init__(self, read_session: Session, write_session: Session, commit_on_exit: bool = True) -> None:
self.read_session = read_session
self.write_session = write_session
self.commit_on_exit = commit_on_exit
self.token = None
self._read = None
self._write = None
def _disable_flush(self, *args, **kwargs):
raise NotImplementedError('Unable to flush a read-only session.')
async def close(self, exc_type=None, exc_value=None, traceback=None):
if self._write:
if exc_value and getattr(exc_value, 'status_code', 500) > 300:
await self._write.rollback()
else:
await self._write.commit()
try:
await self._write.close()
except OperationalError as e:
if e.orig.args[0] != 2013: # Lost connection to MySQL server during query
raise e
if self._read:
try:
await self._read.close()
except OperationalError as e:
if e.orig.args[0] != 2013: # Lost connection to MySQL server during query
raise e
def set_token(self, token):
self.token = token
@property
def read(self) -> Session:
if not self._read:
self._read = self.read_session()
self._read.flush = self._disable_flush
return self._read
@property
def write(self) -> Session:
if not self._write:
self._write = self.write_session()
return self._write
class AsyncSession(SQLAlchemyAsyncSession):
async def execute(self, statement, **parameters):
return await super().execute(statement, parameters)
async def first(self, statement, **parameters):
executed = await self.execute(statement, **parameters)
return executed.first()
async def all(self, statement, **parameters):
executed = await self.execute(statement, **parameters)
return executed.all()
class DBSession:
def __init__(self):
self.app = None
self.read_engine = None
self.read_session = None
self.write_engine = None
self.write_session = None
self._session = None
self.context = ContextVar("context", default=None)
self.commit_on_exit = True
def init_app(self, app: Sanic) -> None:
self.app = app
self.commit_on_exit = self.app.config.get('DATABASE_COMMIT_ON_EXIT', cast=bool, default=True)
engine_args = {
'echo': self.app.config.get('DATABASE_ECHO', cast=bool, default=False),
'echo_pool': self.app.config.get('DATABASE_ECHO_POOL', cast=bool, default=False),
'poolclass': QueuePool, # will be used to create a connection pool instance using the connection parameters given in the URL
# if pool_class is not NullPool:
# if True will enable the connection pool “pre-ping” feature that tests connections for liveness upon each checkout
'pool_pre_ping': self.app.config.get('DATABASE_POOL_PRE_PING', cast=bool, default=True),
# the number of connections to allow in connection pool “overflow”
'max_overflow': self.app.config.get('DATABASE_MAX_OVERFLOW', cast=int, default=10),
# the number of connections to keep open inside the connection pool
'pool_size': self.app.config.get('DATABASE_POOL_SIZE', cast=int, default=100),
# this setting causes the pool to recycle connections after the given number of seconds has passed
'pool_recycle': self.app.config.get('DATABASE_POOL_RECYCLE', cast=int, default=3600),
# number of seconds to wait before giving up on getting a connection from the pool
'pool_timeout': self.app.config.get('DATABASE_POOL_TIMEOUT', cast=int, default=5),
}
self.read_engine = create_async_engine(
self.app.config.get('DATABASE_READ_URL'),
connect_args={
'connect_timeout': self.app.config.get('DATABASE_CONNECT_TIMEOUT', cast=int, default=3)
},
**engine_args
)
# @see https://writeonly.wordpress.com/2009/07/16/simple-read-only-sqlalchemy-sessions/
self.read_session = sessionmaker(
bind=self.read_engine,
expire_on_commit=False,
class_=AsyncSession,
autoflush=False,
autocommit=False
)
self.write_engine = create_async_engine(
self.app.config.get('DATABASE_WRITE_URL'),
connect_args={
'connect_timeout': self.app.config.get('DATABASE_CONNECT_TIMEOUT', cast=int, default=3)
},
**engine_args
)
self.write_session = sessionmaker(
bind=self.write_engine,
expire_on_commit=False,
class_=AsyncSession,
autoflush=True
)
async def __aenter__(self):
session_ctx = DBSessionContext(self.read_session, self.write_session, self.commit_on_exit)
session_ctx.set_token(self.context.set(session_ctx))
return session_ctx
async def __aexit__(self, exc_type, exc_value, traceback):
session_ctx = self.context.get()
await session_ctx.close(exc_type, exc_value, traceback)
self.context.reset(session_ctx.token)
@property
def read(self) -> Session:
return self.context.get().read
@property
def write(self) -> Session:
return self.context.get().write
@event.listens_for(Pool, "checkout")
def check_connection(dbapi_con, con_record, con_proxy):
'''Listener for Pool checkout events that pings every connection before using.
Implements pessimistic disconnect handling strategy. See also:
http://docs.sqlalchemy.org/en/rel_0_8/core/pooling.html#disconnect-handling-pessimistic'''
cursor = dbapi_con.cursor()
try:
cursor.execute("SELECT 1")
except exc.OperationalError as ex:
if ex.args[0] in (2006, # MySQL server has gone away
2013, # Lost connection to MySQL server during query
2055): # Lost connection to MySQL server at '%s', system error: %d
raise exc.DisconnectionError() # caught by pool, which will retry with a new connection
else:
raise
cursor.close()
db = DBSession()
使用起来很简单,我做了以下操作。在路由器中,我制作了一个包装器,它使用启动的数据库调用处理程序:
自定义/route.py
class Route:
async def __call__(self, request: Request, **kwargs):
async with db:
response = await self.handler(*args)
# process the response, such as chaning a str to a text response, etc
return response
不幸的是,我注意到我有很多
(2013 年,“查询期间与 MySQL 服务器失去连接”)
我不知道这是怎么发生的,也不知道为什么会发生。这发生在相对较小的查询中(包含“LIMIT 1”,索引列应该很快)
这是完整的堆栈跟踪:
[2022-05-19 09:35:25 +0000] [92185] [ERROR] Exception occurred while handling uri: 'https://api.pdfshift.io/redacted'
Traceback (most recent call last):
File "asyncmy/connection.pyx", line 610, in asyncmy.connection.Connection._read_bytes
data = await self._reader.readexactly(num_bytes)
File "/usr/lib/python3.9/asyncio/streams.py", line 721, in readexactly
raise exceptions.IncompleteReadError(incomplete, n)
asyncio.exceptions.IncompleteReadError: 0 bytes read on a total of 4 expected bytes
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
self.dialect.do_execute(
File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 732, in do_execute
cursor.execute(statement, parameters)
File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/dialects/mysql/asyncmy.py", line 92, in execute
return self.await_(self._execute_async(operation, parameters))
File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 76, in await_only
return current.driver.switch(awaitable)
File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 129, in greenlet_spawn
value = await result
File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/dialects/mysql/asyncmy.py", line 104, in _execute_async
result = await self._cursor.execute(operation, parameters)
File "asyncmy/cursors.pyx", line 180, in execute
result = await self._query(query)
File "asyncmy/cursors.pyx", line 365, in _query
await conn.query(q)
File "asyncmy/connection.pyx", line 455, in query
await self._read_query_result(unbuffered=unbuffered)
File "asyncmy/connection.pyx", line 636, in _read_query_result
await result.read()
File "asyncmy/connection.pyx", line 1023, in read
first_packet = await self.connection.read_packet()
File "asyncmy/connection.pyx", line 578, in read_packet
packet_header = await self._read_bytes(4)
File "asyncmy/connection.pyx", line 618, in _read_bytes
raise errors.OperationalError(CR_SERVER_LOST, msg) from e
asyncmy.errors.OperationalError: (2013, 'Lost connection to MySQL server during query')
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "handle_request", line 83, in handle_request
)
File "/var/www/project/www/custom/route.py", line 162, in __call__
response = await response
File "/var/www/project/www/apps/webhooks/views.py", line 104, in stripe
await account.reset_usage()
File "/var/www/project/www/apps/accounts/models.py", line 133, in reset_usage
while await db.read.first(query, uuid=self.uuid):
File "/var/www/project/www/custom/database/database.py", line 73, in first
executed = await self.execute(statement, **parameters)
File "/var/www/project/www/custom/database/database.py", line 70, in execute
return await super().execute(statement, parameters)
File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/ext/asyncio/session.py", line 211, in execute
return await greenlet_spawn(
File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 134, in greenlet_spawn
result = context.throw(*sys.exc_info())
File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 1692, in execute
result = conn._execute_20(statement, params or {}, execution_options)
File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1614, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/sql/elements.py", line 325, in _execute_on_connection
return connection._execute_clauseelement(
File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1481, in _execute_clauseelement
ret = self._execute_context(
File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1845, in _execute_context
self._handle_dbapi_exception(
File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2026, in _handle_dbapi_exception
util.raise_(
File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
raise exception
File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
self.dialect.do_execute(
File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 732, in do_execute
cursor.execute(statement, parameters)
File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/dialects/mysql/asyncmy.py", line 92, in execute
return self.await_(self._execute_async(operation, parameters))
File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 76, in await_only
return current.driver.switch(awaitable)
File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 129, in greenlet_spawn
value = await result
File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/dialects/mysql/asyncmy.py", line 104, in _execute_async
result = await self._cursor.execute(operation, parameters)
File "asyncmy/cursors.pyx", line 180, in execute
result = await self._query(query)
File "asyncmy/cursors.pyx", line 365, in _query
await conn.query(q)
File "asyncmy/connection.pyx", line 455, in query
await self._read_query_result(unbuffered=unbuffered)
File "asyncmy/connection.pyx", line 636, in _read_query_result
await result.read()
File "asyncmy/connection.pyx", line 1023, in read
first_packet = await self.connection.read_packet()
File "asyncmy/connection.pyx", line 578, in read_packet
packet_header = await self._read_bytes(4)
File "asyncmy/connection.pyx", line 618, in _read_bytes
raise errors.OperationalError(CR_SERVER_LOST, msg) from e
sqlalchemy.exc.OperationalError: (asyncmy.errors.OperationalError) (2013, 'Lost connection to MySQL server during query')
[SQL: SELECT id FROM conversions WHERE [redacted] LIMIT 1]
[parameters: ('redacted',)]
(Background on this error at: https://sqlalche.me/e/14/e3q8)
连接数据库时,这里是我提供的参数:
DATABASE_POOL_PRE_PING = True
DATABASE_MAX_OVERFLOW = 10
DATABASE_POOL_SIZE = 100
DATABASE_POOL_RECYCLE = 3600
DATABASE_POOL_TIMEOUT = 5
DATABASE_CONNECT_TIMEOUT = 3
(如果您需要 MySQL 服务器端的详细信息,请告诉我要运行哪个命令,我将在此处添加输出)。
我的假设是不知何故,退出
async with db
部分时连接没有正确关闭,所以当另一个请求进来时,使用相同的连接,但最终,MySQL将其杀死,导致上述错误Lost connection to MySQL server during query
更多详情:
谢谢你的帮助!