我正在使用 Django 和 celerybeat。我想通过 env var (cron 的字符串值)配置 cron 计划。
我们目前正在像这样设置 cron 计划,使用
celery.schedules.crontab
:
CELERY_BEAT_SCHEDULE = {
"task_name": {
"task": "app.tasks.task_function",
"schedule": crontab(minute=0, hour='1,3,5,13,15,17,19,21,23'),
},
}
我想通过传递字符串格式的 crontab 来配置计划,如下所示:
CELERY_BEAT_SCHEDULE = {
"task_name": {
"task": "app.tasks.task_function",
"schedule": '0 15 10 ? * *',
},
}
但是,我找不到任何有关如何执行此操作的文档。上面的天真的尝试是行不通的。我可以将 cron 字符串解析为分钟/小时/天等值,然后将它们与相关的 kwargs 一起传递到
crontab
中,但这感觉很混乱。看来应该有一种优雅的方式来做到这一点。
据我所知,我认为除了创建满足此要求的自定义函数之外,没有直接的方法可以做到这一点。
以下是您可以执行此操作的方法。
如果您还没有
utils.py
/helpers.py
或任何适合您项目的文件,请创建一个新文件,并将此自定义方法添加到其中。
def parse_cron_string(cron_str):
parts = cron_str.split()
minute, hour, day_of_month, month, day_of_week = parts[:5] #first five elements from the split cron string
day_of_month = '*' if day_of_month == '?' else day_of_month
month = '*' if month == '?' else month
day_of_week = '*' if day_of_week == '?' else day_of_week
return {
'minute': minute,
'hour': hour,
'day_of_month': day_of_month,
'month_of_year': month,
'day_of_week': day_of_week,
}
现在在合适的
settings
文件中调用这个新创建的方法:
import os
from celery.schedules import crontab
from .utils import parse_cron_string # Adjust the import path as needed
CRON_STRING = os.getenv('CRON_SCHEDULE', '0 15 10 ? * *')
schedule_values = parse_cron_string(CRON_STRING)
CELERY_BEAT_SCHEDULE = {
"task_name": {
"task": "app.tasks.task_function",
"schedule": crontab(**schedule_values),
},
}
请注意,这是一个如何实现此目标的示例,您可能需要根据您的需求进行调整,但正如您所描述的,这应该满足要求。