将任务从 Flask 应用程序传递到 Celery Worker - Flask 应用程序/蓝图/Docker

问题描述 投票:0回答:1

我有一个带有各种端点的 Flask 应用程序,想要向 celery 工作任务队列添加一个端点。由于循环进口,建造芹菜工人容器一直是一项挑战。我尝试了几种方法让 celery 工作人员识别该任务,如下所示,但似乎 Flask 应用程序上下文在初始化时并未传递给 celery 工作。使用 docker compose - the web、db (postgres 13)、redis、celeryworker 运行它似乎可以很好地构建容器,但是当发送应发送到 celeryworker 的 post 请求时,找不到上下文。任何帮助或指示将不胜感激,谢谢。

错误:

celery_worker-1  | [2024-07-18 18:23:17,765: ERROR/MainProcess] Received unregistered task of type 'tasks.process_user_task'.
celery_worker-1  | The message has been ignored and discarded.
celery_worker-1  | 
celery_worker-1  | Did you remember to import the module containing this task?
celery_worker-1  | Or maybe you're using relative imports?

应用程序.py

import os
from flask import Flask
from datetime import timedelta
from dotenv import load_dotenv
from flask_smorest import Api
import logging

def create_app(db_url=None):

    app = Flask(__name__)

    load_dotenv()

    app.config["PROPAGATE_EXCEPTIONS"] = True
    app.config["API_TITLE"] = "###"
    app.config["API_VERSION"] = "v1"
    app.config["OPENAPI_VERSION"] = "3.0.3"
    app.config["OPENAPI_URL_PREFIX"] = "/"
    app.config["OPENAPI_SWAGGER_UI_PATH"] = "/swagger-ui"
    app.config["OPENAPI_SWAGGER_UI_URL"] = "https://cdn.jsdelivr.net/npm/swagger-ui-dist/"
    app.config["SQLALCHEMY_DATABASE_URI"] = db_url or os.getenv("DATABASE_URL")
    app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
    app.config["JWT_SECRET_KEY"] = os.getenv("JWT_SECRET_KEY", "super_secret_key")
    app.config["JWT_ACCESS_TOKEN_EXPIRES"] = timedelta(minutes=15)

    from db import db
    db.init_app(app)

    from flask_jwt_extended import JWTManager
    jwt = JWTManager(app)

    from resources.users import blp as UsersBlueprint
    from resources.auth import blp as AuthBlueprint
    api = Api(app)
    api.register_blueprint(UsersBlueprint)
    api.register_blueprint(AuthBlueprint)

    with app.app_context():
        db.create_all()

    from celery_app import init_celery
    celery = init_celery(app)

    return app, celery

app, celery = create_app()

celery_instance.py:

from celery import Celery
import os

def make_celery(app=None):
    # Initialize Celery instance
    celery = Celery(
        app.import_name if app else __name__,
        backend=os.getenv("CELERY_RESULT_BACKEND", "redis://redis:6379/0"),
        broker=os.getenv("CELERY_BROKER_URL", "redis://redis:6379/0"),
    )
    
    if app:
        celery.conf.update(app.config)
        
        class ContextTask(celery.Task):
            def __call__(self, *args, **kwargs):
                with app.app_context():
                    return self.run(*args, **kwargs)
        
        celery.Task = ContextTask

    return celery

celery_app.py:

from celery_instance import make_celery

celery = make_celery()  # Initialize Celery without the app

def init_celery(app):
    with app.app_context():
        celery.conf.update(app.config)

资源/用户.py:

import uuid
import json
import os
from flask import jsonify, send_file
from flask.views import MethodView
from flask_smorest import Blueprint, abort
from sqlalchemy.exc import SQLAlchemyError
from flask_jwt_extended import jwt_required
from schemas import UserSchema, UserListSchema, UserResponseSchema
from models import Users
from db import db
from email_service import send_email

blp = Blueprint("users", "users", description="Operations on users")

@blp.route("/add_user")
class AddUser(MethodView):
    @blp.arguments(UserSchema)
    @blp.response(202, UserSchema)
    def post(self, user_data):
        from celery_app import celery  # Import Celery instance dynamically to avoid circular import
        from tasks import create_task  # Import tasks dynamically to avoid circular import
        process_user_task = create_task(celery)  # Register the task dynamically
        task = process_user_task.delay(user_data)  # Enqueue the task
        return {"task_id": task.id, "status": "Processing"}, 202

@blp.route("/task_status/<task_id>")
class TaskStatus(MethodView):
    def get(self, task_id):
        from celery_app import celery  # Import Celery instance dynamically to avoid circular import
        from tasks import create_task  # Import tasks dynamically to avoid circular import
        process_user_task = create_task(celery)  # Register the task dynamically
        task = process_user_task.AsyncResult(task_id)  # Get task result
        if task.state == 'PENDING':
            response = {
                'state': task.state,
                'status': 'Pending...'
            }
        elif task.state != 'FAILURE':
            response = {
                'state': task.state,
                'result': task.result
            }
        else:
            response = {
                'state': task.state,
                'status': str(task.info),  # Exception info
            }
        return jsonify(response)

任务.py:

import logging
from models import Users
from db import db
from email_service import send_email
import json
import os
import uuid

def create_task(celery):
    @celery.task
    def process_user_task(user_data):
        logging.info("Starting process_user_task with data: %s", user_data)
        with celery.app.app_context():
            logging.info("App context is active.")
            ## function ##

我尝试了一些方法,都成功构建了容器,但同样存在未注册任务的问题。

烧瓶应用程序完全创建后初始化 Celery - app.py:

    return app

app = create_app()

# Initialize Celery after the Flask app is fully created
from celery_app import init_celery
celery = init_celery(app)

将任务导入celery_app.py:

    app.logger.info("Celery configured successfully")

    # Import tasks to ensure they are registered with Celery
    from tasks import register_tasks
    register_tasks(celery)

    # List registered tasks
    list_registered_tasks(celery)

    return celery

使用celery_instance.py中的celery.autodiscover_tasks:

def make_celery(app=None):
    celery = Celery(
        app.import_name if app else __name__,
        backend=os.getenv("CELERY_RESULT_BACKEND", "redis://redis:6379/0"),
        broker=os.getenv("CELERY_BROKER_URL", "redis://redis:6379/0"),
    )

    if app:
        celery.conf.update(app.config)
        celery.autodiscover_tasks(['tasks.process_user_tasks'], force=True)
        
        class ContextTask(celery.Task):
            def __call__(self, *args, **kwargs):
                with app.app_context():
                    return self.run(*args, **kwargs)
        
        celery.Task = ContextTask

    return celery

还查看了一些 stackexchange 帖子,如下所示,并实现了一个单独的 celery_worker.py 文件,以将 celery 任务保留在资源类之外 - 它构建了容器,但存在同样的问题Celery/Flask 接收未注册类型的任务(应用程序工厂 + 蓝图) :

import os
from app import create_app
from celery_app import init_celery

app, celery = create_app()
app.app_context().push()

任何帮助或指示将不胜感激,谢谢。

python docker flask celery
1个回答
0
投票

只是用我最终采用的解决方案来回答这个问题。我没有使用 Celery,而是使用了 Redis-Rq,它更容易实现。

© www.soinside.com 2019 - 2024. All rights reserved.