我有一个带有各种端点的 Flask 应用程序,想要向 celery 工作任务队列添加一个端点。由于循环进口,建造芹菜工人容器一直是一项挑战。我尝试了几种方法让 celery 工作人员识别该任务,如下所示,但似乎 Flask 应用程序上下文在初始化时并未传递给 celery 工作。使用 docker compose - the web、db (postgres 13)、redis、celeryworker 运行它似乎可以很好地构建容器,但是当发送应发送到 celeryworker 的 post 请求时,找不到上下文。任何帮助或指示将不胜感激,谢谢。
错误:
celery_worker-1 | [2024-07-18 18:23:17,765: ERROR/MainProcess] Received unregistered task of type 'tasks.process_user_task'.
celery_worker-1 | The message has been ignored and discarded.
celery_worker-1 |
celery_worker-1 | Did you remember to import the module containing this task?
celery_worker-1 | Or maybe you're using relative imports?
应用程序.py
import os
from flask import Flask
from datetime import timedelta
from dotenv import load_dotenv
from flask_smorest import Api
import logging
def create_app(db_url=None):
app = Flask(__name__)
load_dotenv()
app.config["PROPAGATE_EXCEPTIONS"] = True
app.config["API_TITLE"] = "###"
app.config["API_VERSION"] = "v1"
app.config["OPENAPI_VERSION"] = "3.0.3"
app.config["OPENAPI_URL_PREFIX"] = "/"
app.config["OPENAPI_SWAGGER_UI_PATH"] = "/swagger-ui"
app.config["OPENAPI_SWAGGER_UI_URL"] = "https://cdn.jsdelivr.net/npm/swagger-ui-dist/"
app.config["SQLALCHEMY_DATABASE_URI"] = db_url or os.getenv("DATABASE_URL")
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.config["JWT_SECRET_KEY"] = os.getenv("JWT_SECRET_KEY", "super_secret_key")
app.config["JWT_ACCESS_TOKEN_EXPIRES"] = timedelta(minutes=15)
from db import db
db.init_app(app)
from flask_jwt_extended import JWTManager
jwt = JWTManager(app)
from resources.users import blp as UsersBlueprint
from resources.auth import blp as AuthBlueprint
api = Api(app)
api.register_blueprint(UsersBlueprint)
api.register_blueprint(AuthBlueprint)
with app.app_context():
db.create_all()
from celery_app import init_celery
celery = init_celery(app)
return app, celery
app, celery = create_app()
celery_instance.py:
from celery import Celery
import os
def make_celery(app=None):
# Initialize Celery instance
celery = Celery(
app.import_name if app else __name__,
backend=os.getenv("CELERY_RESULT_BACKEND", "redis://redis:6379/0"),
broker=os.getenv("CELERY_BROKER_URL", "redis://redis:6379/0"),
)
if app:
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
celery_app.py:
from celery_instance import make_celery
celery = make_celery() # Initialize Celery without the app
def init_celery(app):
with app.app_context():
celery.conf.update(app.config)
资源/用户.py:
import uuid
import json
import os
from flask import jsonify, send_file
from flask.views import MethodView
from flask_smorest import Blueprint, abort
from sqlalchemy.exc import SQLAlchemyError
from flask_jwt_extended import jwt_required
from schemas import UserSchema, UserListSchema, UserResponseSchema
from models import Users
from db import db
from email_service import send_email
blp = Blueprint("users", "users", description="Operations on users")
@blp.route("/add_user")
class AddUser(MethodView):
@blp.arguments(UserSchema)
@blp.response(202, UserSchema)
def post(self, user_data):
from celery_app import celery # Import Celery instance dynamically to avoid circular import
from tasks import create_task # Import tasks dynamically to avoid circular import
process_user_task = create_task(celery) # Register the task dynamically
task = process_user_task.delay(user_data) # Enqueue the task
return {"task_id": task.id, "status": "Processing"}, 202
@blp.route("/task_status/<task_id>")
class TaskStatus(MethodView):
def get(self, task_id):
from celery_app import celery # Import Celery instance dynamically to avoid circular import
from tasks import create_task # Import tasks dynamically to avoid circular import
process_user_task = create_task(celery) # Register the task dynamically
task = process_user_task.AsyncResult(task_id) # Get task result
if task.state == 'PENDING':
response = {
'state': task.state,
'status': 'Pending...'
}
elif task.state != 'FAILURE':
response = {
'state': task.state,
'result': task.result
}
else:
response = {
'state': task.state,
'status': str(task.info), # Exception info
}
return jsonify(response)
任务.py:
import logging
from models import Users
from db import db
from email_service import send_email
import json
import os
import uuid
def create_task(celery):
@celery.task
def process_user_task(user_data):
logging.info("Starting process_user_task with data: %s", user_data)
with celery.app.app_context():
logging.info("App context is active.")
## function ##
我尝试了一些方法,都成功构建了容器,但同样存在未注册任务的问题。
烧瓶应用程序完全创建后初始化 Celery - app.py:
return app
app = create_app()
# Initialize Celery after the Flask app is fully created
from celery_app import init_celery
celery = init_celery(app)
将任务导入celery_app.py:
app.logger.info("Celery configured successfully")
# Import tasks to ensure they are registered with Celery
from tasks import register_tasks
register_tasks(celery)
# List registered tasks
list_registered_tasks(celery)
return celery
使用celery_instance.py中的celery.autodiscover_tasks:
def make_celery(app=None):
celery = Celery(
app.import_name if app else __name__,
backend=os.getenv("CELERY_RESULT_BACKEND", "redis://redis:6379/0"),
broker=os.getenv("CELERY_BROKER_URL", "redis://redis:6379/0"),
)
if app:
celery.conf.update(app.config)
celery.autodiscover_tasks(['tasks.process_user_tasks'], force=True)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
还查看了一些 stackexchange 帖子,如下所示,并实现了一个单独的 celery_worker.py 文件,以将 celery 任务保留在资源类之外 - 它构建了容器,但存在同样的问题Celery/Flask 接收未注册类型的任务(应用程序工厂 + 蓝图) :
import os
from app import create_app
from celery_app import init_celery
app, celery = create_app()
app.app_context().push()
任何帮助或指示将不胜感激,谢谢。
只是用我最终采用的解决方案来回答这个问题。我没有使用 Celery,而是使用了 Redis-Rq,它更容易实现。