我正在使用 python Flask 构建一个网站。 一切都很顺利,现在我正在尝试实施芹菜。
一切进展顺利,直到我尝试使用 celery 的 Flask-mail 发送电子邮件。 现在我收到“在应用程序上下文之外工作”错误。
完整的追溯是
Traceback (most recent call last):
File "/usr/lib/python2.7/site-packages/celery/task/trace.py", line 228, in trace_task
R = retval = fun(*args, **kwargs)
File "/usr/lib/python2.7/site-packages/celery/task/trace.py", line 415, in __protected_call__
return self.run(*args, **kwargs)
File "/home/ryan/www/CG-Website/src/util/mail.py", line 28, in send_forgot_email
msg = Message("Recover your Crusade Gaming Account")
File "/usr/lib/python2.7/site-packages/flask_mail.py", line 178, in __init__
sender = current_app.config.get("DEFAULT_MAIL_SENDER")
File "/usr/lib/python2.7/site-packages/werkzeug/local.py", line 336, in __getattr__
return getattr(self._get_current_object(), name)
File "/usr/lib/python2.7/site-packages/werkzeug/local.py", line 295, in _get_current_object
return self.__local()
File "/usr/lib/python2.7/site-packages/flask/globals.py", line 26, in _find_app
raise RuntimeError('working outside of application context')
RuntimeError: working outside of application context
这是我的邮件功能:
@celery.task
def send_forgot_email(email, ref):
global mail
msg = Message("Recover your Crusade Gaming Account")
msg.recipients = [email]
msg.sender = "Crusade Gaming [email protected]"
msg.html = \
"""
Hello Person,<br/>
You have requested your password be reset. <a href="{0}" >Click here recover your account</a> or copy and paste this link in to your browser: {0} <br />
If you did not request that your password be reset, please ignore this.
""".format(url_for('account.forgot', ref=ref, _external=True))
mail.send(msg)
这是我的芹菜文件:
from __future__ import absolute_import
from celery import Celery
celery = Celery('src.tasks',
broker='amqp://',
include=['src.util.mail'])
if __name__ == "__main__":
celery.start()
这是一个与 Flask 应用程序工厂模式配合使用的解决方案,还可以创建具有上下文的 celery 任务,而无需使用
app.app_context()
。在避免循环导入的同时获取该应用程序确实很棘手,但这解决了它。这是针对 celery 4.2 的,这是撰写本文时最新的版本。
结构:
repo_name/
manage.py
base/
base/__init__.py
base/app.py
base/runcelery.py
base/celeryconfig.py
base/utility/celery_util.py
base/tasks/workers.py
所以
base
是本例中的主要应用程序包。在 base/__init__.py
中,我们创建 celery 实例,如下所示:
from celery import Celery
celery = Celery('base', config_source='base.celeryconfig')
base/app.py
文件包含 Flask 应用工厂 create_app
并记下它包含的 init_celery(app, celery)
:
from base import celery
from base.utility.celery_util import init_celery
def create_app(config_obj):
"""An application factory, as explained here:
http://flask.pocoo.org/docs/patterns/appfactories/.
:param config_object: The configuration object to use.
"""
app = Flask('base')
app.config.from_object(config_obj)
init_celery(app, celery=celery)
register_extensions(app)
register_blueprints(app)
register_errorhandlers(app)
register_app_context_processors(app)
return app
继续
base/runcelery.py
内容:
from flask.helpers import get_debug_flag
from base.settings import DevConfig, ProdConfig
from base import celery
from base.app import create_app
from base.utility.celery_util import init_celery
CONFIG = DevConfig if get_debug_flag() else ProdConfig
app = create_app(CONFIG)
init_celery(app, celery)
接下来,
base/celeryconfig.py
文件(作为示例):
# -*- coding: utf-8 -*-
"""
Configure Celery. See the configuration guide at ->
http://docs.celeryproject.org/en/master/userguide/configuration.html#configuration
"""
## Broker settings.
broker_url = 'pyamqp://guest:guest@localhost:5672//'
broker_heartbeat=0
# List of modules to import when the Celery worker starts.
imports = ('base.tasks.workers',)
## Using the database to store task state and results.
result_backend = 'rpc'
#result_persistent = False
accept_content = ['json', 'application/text']
result_serializer = 'json'
timezone = "UTC"
# define periodic tasks / cron here
# beat_schedule = {
# 'add-every-10-seconds': {
# 'task': 'workers.add_together',
# 'schedule': 10.0,
# 'args': (16, 16)
# },
# }
现在在
base/utility/celery_util.py
文件中定义init_celery:
# -*- coding: utf-8 -*-
def init_celery(app, celery):
"""Add flask app context to celery.Task"""
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
对于
base/tasks/workers.py
的工人:
from base import celery as celery_app
from flask_security.utils import config_value, send_mail
from base.bp.users.models.user_models import User
from base.extensions import mail # this is the flask-mail
@celery_app.task
def send_async_email(msg):
"""Background task to send an email with Flask-mail."""
#with app.app_context():
mail.send(msg)
@celery_app.task
def send_welcome_email(email, user_id, confirmation_link):
"""Background task to send a welcome email with flask-security's mail.
You don't need to use with app.app_context() here. Task has context.
"""
user = User.query.filter_by(id=user_id).first()
print(f'sending user {user} a welcome email')
send_mail(config_value('EMAIL_SUBJECT_REGISTER'),
email,
'welcome', user=user,
confirmation_link=confirmation_link)
然后,您需要从repo_name
文件夹中的两个不同的cmd提示符中启动celerybeat和celeryworker。 在一个 cmd 提示符下执行
celery -A base.runcelery:celery beat
,然后执行另一个
celery -A base.runcelery:celery worker
。然后,运行需要 Flask 上下文的任务。应该可以工作。
with app.app_context():
celery.start()
mail
而我的错误是在芹菜任务中尝试使用
url_for
,我怀疑这两个问题与同一问题相关,并且您会因使用
url_for
如果您之前尝试过使用它的话
mail
。由于 celery 任务中没有应用程序的上下文(即使在包含
import app from my_app_module
后),我也遇到了错误。 您需要在应用程序的上下文中执行
mail
操作:
from module_containing_my_app_and_mail import app, mail # Flask app, Flask mail
from flask.ext.mail import Message # Message class
@celery.task
def send_forgot_email(email, ref):
with app.app_context(): # This is the important bit!
msg = Message("Recover your Crusade Gaming Account")
msg.recipients = [email]
msg.sender = "Crusade Gaming [email protected]"
msg.html = \
"""
Hello Person,<br/>
You have requested your password be reset. <a href="{0}" >Click here recover your account</a> or copy and paste this link in to your browser: {0} <br />
If you did not request that your password be reset, please ignore this.
""".format(url_for('account.forgot', ref=ref, _external=True))
mail.send(msg)
如果有人感兴趣,可以在
here
找到我在芹菜任务中使用url_for
问题的解决方案
Celery
应用程序的文件
# celery_app_file.py
from celery import Celery
celery_app = Celery(__name__)
Flask
应用程序并使用它对之前创建的
Celery
应用程序进行猴子修补
# flask_app_file.py
from flask import Flask
from celery_app import celery_app
flask_app = Flask(__name__)
class ContextTask(celery_app.Task):
def __call__(self, *args, **kwargs):
with flask_app.app_context():
if self.abstract:
return super().__call__(*args, **kwargs)
else:
return super().__call__(self, *args, **kwargs)
celery_app.Task = ContextTask
现在,任何时候您将 Celery 应用程序导入到不同的文件中(例如 mailing/tasks.py
包含电子邮件相关的内容,或
database/tasks.py
包含数据库相关的内容),它将是已经经过猴子修补的版本,可以在Flask 上下文。要记住的重要一点是,当您通过命令行启动 Celery 时,这种猴子修补
必须发生。这意味着(使用我的示例)您必须运行 celery -A flask_app_file.celery_app worker
,因为
flask_app_file.py
是包含
celery_app
变量的文件,并为其分配了经过猴子修补的 Celery 应用程序。
from whateverpackagename import app
from whateverpackagename import mail
@celery.task
def send_forgot_email(email, ref):
with app.test_request_context():
msg = Message("Recover your Crusade Gaming Account")
msg.recipients = [email]
msg.sender = "Crusade Gaming [email protected]"
msg.html = \
"""
Hello Person,<br/>
You have requested your password be reset. <a href="{0}" >Click here recover your account</a> or copy and paste this link in to your browser: {0} <br />
If you did not request that your password be reset, please ignore this.
""".format(url_for('account.forgot', ref=ref, _external=True))
mail.send(msg)
app.app_context()
,只需在注册蓝图之前配置 celery 即可,如下所示:
celery = Celery('myapp', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
从您希望使用 celery 的蓝图中,调用已创建的 celery 实例来创建您的 celery 任务。
它将按预期工作。
celery.Task
子类是一个干净的解决方案。而且它不必通过猴子补丁来完成。 Flask 有一个扩展点。
代码示例来自
https://flask.palletsprojects.com/en/stable/patterns/celery/ :
# from https://flask.palletsprojects.com/en/stable/patterns/celery/
from celery import Celery, Task
def celery_init_app(app: Flask) -> Celery:
class FlaskTask(Task):
def __call__(self, *args: object, **kwargs: object) -> object:
with app.app_context():
return self.run(*args, **kwargs)
celery_app = Celery(app.name, task_cls=FlaskTask)
celery_app.config_from_object(app.config["CELERY"])
celery_app.set_default()
app.extensions["celery"] = celery_app
return celery_app