如何使用 Celery 让我的 web 应用程序在特定日期的特定时间执行 python 函数?

问题描述 投票:0回答:1

我正在制作一个存储在谷歌云上的网络应用程序。对于我的后端,我使用 Flask、Python、Celery 和 RabbitMQ。对于我的前端,我使用 React、JavaScript 和 HTML/CSS。我在 VS Code 上编码。

我尝试制作一个schedule.json来确定应该在哪一天调用我的函数,但为了测试我只每X分钟输入一次“print hello”,因为我认为如果我理解它的工作方式,那么扩展它就会很容易到我的日历:

{
    "print-hello-14-10": {
        "task": "app.tasks.print_hello",
        "schedule": {
            "hour": 14,
            "minute": 10
        }
    },
    "print-hello-14-11": {
        "task": "app.tasks.print_hello",
        "schedule": {
            "hour": 14,
            "minute": 11
        }
    }
}

我制作了四个 .py 文件。 我的 init.py 文件:

from flask import Flask

def create_app():
    app = Flask(__name__)
    app.config.update(
        CELERY_BROKER_URL='amqps://<username>:<password>@<hostname>/<vhost>',
        CELERY_RESULT_BACKEND='rpc://',
    )
    from .tasks import init_celery
    init_celery(app)
    return app

我的make_celery.py文件:

from celery import Celery

def make_celery(app):
    celery = Celery(
        app.import_name,
        broker=app.config['CELERY_BROKER_URL'],
        backend=app.config['CELERY_RESULT_BACKEND'],
    )
    celery.conf.update(app.config)
    return celery

我的run.py文件:

from app import create_app

app = create_app()

if __name__ == '__main__':
    app.run(debug=True)

我的tasks.py文件:

import os
import json
from celery import Celery
from celery.schedules import crontab

celery = Celery(__name__)

def init_celery(app=None):
    app.config.update(
        CELERY_BROKER_URL='amqp://<username>:<password>@<hostname>/<vhost>',
        CELERY_RESULT_BACKEND='rpc://',
    )
    celery.conf.update(app.config)
    if app:
        celery.conf.update(app.config)
    update_schedule_from_json()

@celery.task
def print_hello():
    print('Hello')

def update_schedule_from_json():
    schedule_file = 'schedule.json'
    if os.path.exists(schedule_file):
        with open(schedule_file, 'r') as f:
            schedule_data = json.load(f)
        celery.conf.beat_schedule = {}
        for key, value in schedule_data.items():
            celery.conf.beat_schedule[key] = {
                'task': value['task'],
                'schedule': crontab(hour=value['schedule']['hour'], minute=value['schedule']['minute']),
            }

# Call this function to update the schedule whenever necessary
update_schedule_from_json()

我尝试通过像这样运行我的 Flask 应用程序来启动它

python run.py
,然后使用
celery -A app.tasks worker --loglevel=info
在终端中运行我的 celery 工作程序,并使用
celery -A app.tasks beat --loglevel=info
在单独的终端中运行我的 celery 节拍调度程序。

它不起作用,尽管当我使用固定间隔(没有所有调度内容)时,我设法让我的 print hello 工作。所以我认为我的调度程序有问题,但我不知道要修改什么。我请求你的帮助。

python flask rabbitmq celery-task celerybeat
1个回答
0
投票

您想要将

day_of_week
传递到
crontab
调用中。您可以在此处的 celery 文档中找到示例:https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html#crontab-schedules

© www.soinside.com 2019 - 2024. All rights reserved.