我正在制作一个存储在谷歌云上的网络应用程序。对于我的后端,我使用 Flask、Python、Celery 和 RabbitMQ。对于我的前端,我使用 React、JavaScript 和 HTML/CSS。我在 VS Code 上编码。
我尝试制作一个schedule.json来确定应该在哪一天调用我的函数,但为了测试我只每X分钟输入一次“print hello”,因为我认为如果我理解它的工作方式,那么扩展它就会很容易到我的日历:
{
"print-hello-14-10": {
"task": "app.tasks.print_hello",
"schedule": {
"hour": 14,
"minute": 10
}
},
"print-hello-14-11": {
"task": "app.tasks.print_hello",
"schedule": {
"hour": 14,
"minute": 11
}
}
}
我制作了四个 .py 文件。 我的 init.py 文件:
from flask import Flask
def create_app():
app = Flask(__name__)
app.config.update(
CELERY_BROKER_URL='amqps://<username>:<password>@<hostname>/<vhost>',
CELERY_RESULT_BACKEND='rpc://',
)
from .tasks import init_celery
init_celery(app)
return app
我的make_celery.py文件:
from celery import Celery
def make_celery(app):
celery = Celery(
app.import_name,
broker=app.config['CELERY_BROKER_URL'],
backend=app.config['CELERY_RESULT_BACKEND'],
)
celery.conf.update(app.config)
return celery
我的run.py文件:
from app import create_app
app = create_app()
if __name__ == '__main__':
app.run(debug=True)
我的tasks.py文件:
import os
import json
from celery import Celery
from celery.schedules import crontab
celery = Celery(__name__)
def init_celery(app=None):
app.config.update(
CELERY_BROKER_URL='amqp://<username>:<password>@<hostname>/<vhost>',
CELERY_RESULT_BACKEND='rpc://',
)
celery.conf.update(app.config)
if app:
celery.conf.update(app.config)
update_schedule_from_json()
@celery.task
def print_hello():
print('Hello')
def update_schedule_from_json():
schedule_file = 'schedule.json'
if os.path.exists(schedule_file):
with open(schedule_file, 'r') as f:
schedule_data = json.load(f)
celery.conf.beat_schedule = {}
for key, value in schedule_data.items():
celery.conf.beat_schedule[key] = {
'task': value['task'],
'schedule': crontab(hour=value['schedule']['hour'], minute=value['schedule']['minute']),
}
# Call this function to update the schedule whenever necessary
update_schedule_from_json()
我尝试通过像这样运行我的 Flask 应用程序来启动它
python run.py
,然后使用 celery -A app.tasks worker --loglevel=info
在终端中运行我的 celery 工作程序,并使用 celery -A app.tasks beat --loglevel=info
在单独的终端中运行我的 celery 节拍调度程序。
它不起作用,尽管当我使用固定间隔(没有所有调度内容)时,我设法让我的 print hello 工作。所以我认为我的调度程序有问题,但我不知道要修改什么。我请求你的帮助。
您想要将
day_of_week
传递到 crontab
调用中。您可以在此处的 celery 文档中找到示例:https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html#crontab-schedules