将 SQLAlchemy 会话与 Celery 结合使用的正确方法是什么?

问题描述 投票:0回答:3

我已经尝试弄清楚这个问题有一段时间了,但我现在能找到的很多答案都已经过时(超过 6 年前的帖子),或者相关性较低。

问题实际上是如何在 celery 中正确处理数据库会话。我当前的设置是:我有一个全局 DbEngine 对象,其中包含

dsn
engine
Session
。因此,每次如果我想使用会话,我只需调用
sess = db.Session()
并开始在我的 Flask 应用程序中使用它们。它看起来像下面这样:

#db.py
class DbEngine:
  def __init__(self, path, ...):
    self.dsn = self.create_dsn_from_file(path)
    self.engine = create_engine(self.dsn)
    self.Session = scoped_session(sessionmaker(bind=self.engine))

在我将 celery 引入我的应用程序之前,我经常遇到各种错误(协议错误、sslSocket 错误等)。我无法在本地重现它们,如果我简单地向这些任务添加重试(通常会重试 3 次),它通常会得到修复。所以我怀疑这可能是由共享会话引起的。

然后我决定更改 celery 应用程序的会话:每次如果我需要会话,我实际上会创建一个新引擎,创建一个新会话,并返回新创建的

Session()
。然而,类似但不完全相同的问题又发生了(不同代码的各种协议错误)。

我看到celery有自己的SessionManager,但我找不到使用示例代码。我正在考虑以下结构:

# celery_app.py

celery = ... 

session_manager = SessionManager()
engine, Session = session_manager.create_session(dsn)

在我所做的任务中:

# task_1.py
from celery_app import celery, Session

@celery.task
def tsk():
  sess = Session()
  sess.query(...)
  ... 
  sess.close()

但真的不确定这是否是预期的方法,因为我只调用

.create_session()
一次,而且我不知道何时以及如何调用 SessionManager 中的其他函数。

对于后台,我使用的是DB2。

如果有人知道如何正确使用 SessionManager 或有使用 SQLAlchemy 会话的经验,如果您能提供任何见解,我们将不胜感激。预先感谢您!

python flask orm sqlalchemy celery
3个回答
2
投票

发布此内容希望对某人有所帮助。就我而言,我们使用 Fastapi、Postgres、Sqlalchemy 和 Celery。

我也遇到了同样的问题,这非常具有挑战性。我们的 postgres 数据库中存在许多神秘错误,看起来连接在中途停止。

PG_TUPLES_OK and no message from the libpq
AttributeError("'_NoResultMetaData' object has no attribute '_indexes_for_keys'")
...等等等等

TL;博士

Sqlalchemy 默认以非线程安全的方式池化连接, Celery 默认分叉进程:需要更改其中之一。

解决方案1)关闭Sqlalchemy池化 *我们最终选择这样做是为了保持更好的并发性

Sql Alchemy 文档

from sqlalchemy.pool import NullPool
engine = create_engine(
    SQLALCHEMY_DATABASE_URL, poolclass=NullPool
)

解决方案 2)使 Celery 作为单个进程运行,无需分叉

celery -A celery_worker.celery worker -E --loglevel=info --pool=solo

有关芹菜池的文章 这个解决方案有效,但意味着我们所有的 celery 任务都将串行运行,这消除了错误,但我们需要更多带宽。这对于某些应用程序可能没问题。


0
投票

也许不是您正在寻找的答案,但是

flask_sqlalchemy
在 SQLAlchemy 上提供了一个 Flask 友好的层,几乎完全消除了手动管理会话的需要。一旦您按照 Flask 文档中的示例设置了 ContextTask,它就可以很好地与 celery 配合使用。


0
投票

使用一个多吨对象来为每个任务保存一个

engine
怎么样?

这样,引擎将仅在第一次执行时创建(或者在启动时创建,如果您将其设置为执行此操作),并且该引擎将可以通过与该实例匹配的 fork 重用...

类似:

{
   "process_1": engine_object,
   "process_2": engine_object,
   "process_3": engine_object,
   ...
}
© www.soinside.com 2019 - 2024. All rights reserved.