Celery 任务未执行,但工作程序和节拍运行完美

问题描述 投票:0回答:1

我有一个运行没有问题的应用程序,我最近从 APScheduler 切换到 Celery。 我已经为其设置了 redis 服务器,并且 celery 任务似乎从外部运行顺利。 我已经启动了芹菜工人并在不同的终端上击败。 每当执行任务时,我的应用程序都应该给出输出。但没有输出

我的 app.py 的代码:

#I have imported everything required

#creating the app
app = Flask(__name__, static_folder='static')

#configuring the celery app
app.config['broker_url'] = 'redis://localhost:6379/0'
app.config['result_backend'] = 'redis://localhost:6379/0'

#creating the celery app
celery = Celery(app.name, broker=app.config['broker_url'])
celery.conf.update(app.config)

app.secret_key = '9557186409'

#configuring the database
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///site.db'
db.init_app(app)

webhook_url = "https://chat.googleapis.com/v1/spaces/AAAA0XTp2i8/messages?key=AIzaSyDdI0hCZtE6vySjMm-WEfRq3CPzqKqqsHI&token=4sX2rjnS34sBcb6Q7tKN800_IHQbcHCB76lW_0rbxqw"


def send_message_to_chat(message):
    data = {"text": message}
    response = requests.post(webhook_url, json=data)
    print(f"Message sent to chat: {response.status_code}")

with app.app_context():
    #creating the databases
    db.create_all()

index_routes(app)
librarian_routes(app)
student_routes(app)


login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login_session_not_found'


@login_manager.user_loader
def load_user(user_id):
    return users_db.query.get(user_id)

@celery.task
def test_celery():
    print("Celery task executed at ", datetime.now())

@celery.task
def check_expiry():
    try:
        print("Executing check_expiry task at ", datetime.now())
        issued = requests_db.query.filter_by(status='issued').all()
        for issue in issued:
            if issue.end_date and issue.end_date < datetime.now():
                print(f"Issue {issue.id} is expired.")
                issue.status = "completed"
        db.session.commit()
        print("check_expiry task completed.")
    except Exception as e:
        print(f"Error in check_expiry task: {e}")


@celery.task
def send_reminder():
    with app.app_context():
        print("Sending reminders ", datetime.now())
        send_message_to_chat("Sending reminders")
        current_time = datetime.now()
        for user in users_db.query.all():
            #check if current time is the preferred time of the user, if last reminder was not sent in the current date
            if user.preferred_hour == current_time.hour:
                if user.last_reminder_datetime == None or user.last_reminder_datetime.date() != current_time.date():
                    send_message_to_chat(f"Reminder for {user.username}")
                    user.last_reminder_datetime = current_time
                    db.session.commit()

celery.conf.beat_schedule = {
    "check_expiry": {
        "task": "app.check_expiry",
        #runs every minute
        "schedule": crontab(minute="*")
    },
    "send_reminder": {
        "task": "app.send_reminder",
        #runs every hour
        "schedule": crontab(minute="*")
    },
    "test_celery": {
        "task": "app.test_celery",
        #runs every minute
        "schedule": crontab(minute="*")
    }
}

#starting the app
if __name__ == '__main__':
    app.run()
    

我正在使用 venv 来运行这个应用程序。我尝试过运行worker并在venv内部和外部进行击败。

Worker初始化:

celery -A app.celery worker --loglevel=info
节拍初始化:
celery -A app.celery beat --loglevel=info

工人终端:

[2024-07-31 16:38:00,004: INFO/MainProcess] Task app.test_celery[1f068e64-4cb4-43ad-b1d3-f7cd91bd0a5f] received
[2024-07-31 16:38:00,007: INFO/MainProcess] Task app.send_reminder[9520a809-4833-43b3-9964-520d13f26fa2] received
[2024-07-31 16:38:00,008: INFO/MainProcess] Task app.check_expiry[26f91af5-c07e-4acd-b816-a36cc299a947] received

击败终端:

[2024-07-31 16:38:00,000: INFO/MainProcess] Scheduler: Sending due task test_celery (app.test_celery)
[2024-07-31 16:38:00,001: INFO/MainProcess] Scheduler: Sending due task send_reminder (app.send_reminder)
[2024-07-31 16:38:00,004: INFO/MainProcess] Scheduler: Sending due task check_expiry (app.check_expiry)

所有这些都没有Python输出

我尝试删除 debug=true 选项,我尝试创建一个简单的 celery 任务来检查内容,我尝试重新启动应用程序、worker 和beat。 我也尝试向 gpt4o 寻求帮助,但仍然一无所获。 仍然没有输出。

flask redis celery celery-task celerybeat
1个回答
0
投票

如果您在 Windows 上运行,请使用

 celery -A app.celery worker --loglevel=info -P solo

默认的 prefork 在 Windows 中不起作用。 查看这篇文章

© www.soinside.com 2019 - 2024. All rights reserved.