根据 http://docs.sqlalchemy.org/en/rel_0_9/core/pooling.html#disconnect-handling-pessimistic,如果连接池中的条目不再有效,则可以对 SQLAlchemy 进行重新连接。我创建以下测试用例来测试这一点:
import subprocess
from sqlalchemy import create_engine, event
from sqlalchemy import exc
from sqlalchemy.pool import Pool
@event.listens_for(Pool, "checkout")
def ping_connection(dbapi_connection, connection_record, connection_proxy):
cursor = dbapi_connection.cursor()
try:
print "pinging server"
cursor.execute("SELECT 1")
except:
print "raising disconnect error"
raise exc.DisconnectionError()
cursor.close()
engine = create_engine('postgresql://postgres@localhost/test')
connection = engine.connect()
subprocess.check_call(['psql', str(engine.url), '-c',
"select pg_terminate_backend(pid) from pg_stat_activity " +
"where pid <> pg_backend_pid() " +
"and datname='%s';" % engine.url.database],
stdout=subprocess.PIPE)
result = connection.execute("select 'OK'")
for row in result:
print "Success!", " ".join(row)
但是我没有恢复,而是收到了这个异常:
sqlalchemy.exc.OperationalError: (OperationalError) terminating connection due to administrator command
server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
由于终端上打印了“pinging 服务器”,因此似乎可以安全地得出事件侦听器已附加的结论。如何教导 SQLAlchemy 从断开连接中恢复?
看起来checkout方法仅在您第一次从池中获取连接时被调用(例如您的
connection = engine.connect()
线)
如果您随后失去连接,则必须显式替换它,这样您就可以获取一个新连接,然后重试您的 sql:
try:
result = connection.execute("select 'OK'")
except sqlalchemy.exc.OperationalError: # may need more exceptions here
connection = engine.connect() # grab a new connection
result = connection.execute("select 'OK'") # and retry
这对于 SQL 的每一个部分来说都是很痛苦的,所以你可以使用类似的东西来包装数据库查询:
def db_execute(conn, query):
try:
result = conn.execute(query)
except sqlalchemy.exc.OperationalError: # may need more exceptions here (or trap all)
conn = engine.connect() # replace your connection
result = conn.execute(query) # and retry
return result
以下:
result = db_execute(connection, "select 'OK'")
现在应该成功了。
另一种选择是还监听 invalidate 方法,并在那时采取一些操作来替换您的连接。
您可以使用预 ping - 这会发出一个测试语句,以确保在运行实际语句之前数据库连接仍然可行。
该方法给连接检查过程增加了一点开销,但在其他方面却是完全消除由于过时的池连接而导致的数据库错误的最简单、最可靠的方法。
engine = create_engine("mysql+pymysql://user:pw@host/db", pool_pre_ping=True)