我已经尝试弄清楚这个问题有一段时间了,但我现在能找到的很多答案都已经过时(超过 6 年前的帖子),或者相关性较低。
问题实际上是如何在 celery 中正确处理数据库会话。我当前的设置是:我有一个全局 DbEngine 对象,其中包含
dsn
、engine
和 Session
。因此,每次如果我想使用会话,我只需调用 sess = db.Session()
并开始在我的 Flask 应用程序中使用它们。它看起来像下面这样:
#db.py
class DbEngine:
def __init__(self, path, ...):
self.dsn = self.create_dsn_from_file(path)
self.engine = create_engine(self.dsn)
self.Session = scoped_session(sessionmaker(bind=self.engine))
在我将 celery 引入我的应用程序之前,我经常遇到各种错误(协议错误、sslSocket 错误等)。我无法在本地重现它们,如果我简单地向这些任务添加重试(通常会重试 3 次),它通常会得到修复。所以我怀疑这可能是由共享会话引起的。
然后我决定更改 celery 应用程序的会话:每次如果我需要会话,我实际上会创建一个新引擎,创建一个新会话,并返回新创建的
Session()
。然而,类似但不完全相同的问题又发生了(不同代码的各种协议错误)。
我看到celery有自己的SessionManager,但我找不到使用示例代码。我正在考虑以下结构:
# celery_app.py
celery = ...
session_manager = SessionManager()
engine, Session = session_manager.create_session(dsn)
在我所做的任务中:
# task_1.py
from celery_app import celery, Session
@celery.task
def tsk():
sess = Session()
sess.query(...)
...
sess.close()
但真的不确定这是否是预期的方法,因为我只调用
.create_session()
一次,而且我不知道何时以及如何调用 SessionManager 中的其他函数。
对于后台,我使用的是DB2。
如果有人知道如何正确使用 SessionManager 或有使用 SQLAlchemy 会话的经验,如果您能提供任何见解,我们将不胜感激。预先感谢您!
发布此内容希望对某人有所帮助。就我而言,我们使用 Fastapi、Postgres、Sqlalchemy 和 Celery。
我也遇到了同样的问题,这非常具有挑战性。我们的 postgres 数据库中存在许多神秘错误,看起来连接在中途停止。
PG_TUPLES_OK and no message from the libpq
,AttributeError("'_NoResultMetaData' object has no attribute '_indexes_for_keys'")
...等等等等
TL;博士
Sqlalchemy 默认以非线程安全的方式池化连接, Celery 默认分叉进程:需要更改其中之一。
解决方案1)关闭Sqlalchemy池化 *我们最终选择这样做是为了保持更好的并发性
from sqlalchemy.pool import NullPool
engine = create_engine(
SQLALCHEMY_DATABASE_URL, poolclass=NullPool
)
解决方案 2)使 Celery 作为单个进程运行,无需分叉
celery -A celery_worker.celery worker -E --loglevel=info --pool=solo
有关芹菜池的文章 这个解决方案有效,但意味着我们所有的 celery 任务都将串行运行,这消除了错误,但我们需要更多带宽。这对于某些应用程序可能没问题。
也许不是您正在寻找的答案,但是
flask_sqlalchemy
在 SQLAlchemy 上提供了一个 Flask 友好的层,几乎完全消除了手动管理会话的需要。一旦您按照 Flask 文档中的示例设置了 ContextTask,它就可以很好地与 celery 配合使用。
使用一个多吨对象来为每个任务保存一个
engine
怎么样?
这样,引擎将仅在第一次执行时创建(或者在启动时创建,如果您将其设置为执行此操作),并且该引擎将可以通过与该实例匹配的 fork 重用...
类似:
{
"process_1": engine_object,
"process_2": engine_object,
"process_3": engine_object,
...
}