我正在开发一个 Flask 服务器,并尝试集成 celery 来处理通过 Webhook 传入的数据转换。我在 Flask 设置中广泛使用环境变量来处理 API 密钥和数据库访问等内容,并且我将所有这些都存储在 .env 文件中。我希望能够在 celery Worker 中拥有一个 Flask 实例,以提供对 sqlalchemy 模型、数据库对象等的访问,但是当我尝试创建一个 Worker 时,Flask 的应用程序工厂失败,因为 .env 变量无处可寻。在尝试 db init 之前,我应该通过某种方式将它们预加载到 Flask 中吗?
运行命令
celery -A make_celery worker --loglevel INFO
结果
RuntimeError: Either 'SQLALCHEMY_DATABASE_URI' or 'SQLALCHEMY_BINDS' must be set.
Flask 能够自行运行得很好,我正在艰难地调试工作线程,因为它没有打印到控制台。
我的
make_celery
文件是这样的:
import os
from app import create_app
flask_app = create_app(os.getenv('FLASK_CONFIG') or 'default')
celery_app = flask_app.extensions["celery"]
create_app
方法(匿名)是这样的:
from celery_app import celery_init_app
from config import config
db = SQLAlchemy()
def create_app(config_name):
app = Flask(__name__)
app.config.from_object(config[config_name])
config[config_name].init_app(app)
# worker fails here every time as SQLAlCHEMY_DATABASE_URI is not set
db.init_app(app)
with app.app_context():
#not included is importing of models and migrate init
app.config.from_mapping(
CELERY=dict(
broker_url="amqp://guest:[email protected]:5672//",
result_backend=app.config['CELERY_BACKEND_URL'],
task_ignore_result=True,
),
)
app.config.from_prefixed_env()
celery_init_app(app)
return app
以及 create_app() 方法中包含的 celery_init_app() 方法:
def celery_init_app(app: Flask) -> Celery:
class FlaskTask(Task):
def __call__(self, *args: object, **kwargs: object) -> object:
with app.app_context():
return self.run(*args, **kwargs)
celery_app = Celery(app.name, task_cls=FlaskTask)
celery_app.config_from_object(app.config["CELERY"])
celery_app.set_default()
app.extensions["celery"] = celery_app
return celery_app
我对 Flask 和 Celery 都是新手,无法破解这个。
我运行了上面的 celery Worker 命令,期望它创建一个能够访问 Flask 应用程序实例及其所有组件的 Worker,但由于缺少环境变量,它无法启动 sqlalchemy db 对象。
发布此消息后立即得到 Miguel Grinberg 的回复并修复了该问题。只需要在 make_celery 方法中加载 load_dotenv 即可。希望这对其他人有帮助!