我有一个包含动态计划任务的时间和元数据的数据库。
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from concentrApp import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'concentrApp.settings')
app = Celery('concentrApp')
# You can add celery settings in settings.py starting with CELERY_
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda : settings.INSTALLED_APPS)
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'concentrApp.settings')
app.conf.timezone = "Asia/Jerusalem"
我制作了从数据库中获取数据并插入到 celery 中的函数:
from datetime import datetime
from application.models import Schedule
from concentrApp.celery import app
from celery.schedules import crontab
def get_data_from_database():
return Schedule.objects.all()
# hour, min should be int, change day_of_week to "0-3"
def create_celery_schedule(hour, minute):
return crontab(minute=int(minute), hour=int(hour), day_of_week="0-6")
def update_or_delete_scheduled_task(task_data, new_schedule=None):
task_name = task_data.id
expo_token = task_data.participant.expo_token
if new_schedule:
app.conf.beat_schedule[task_name] = {
"task": "concentrApp.tasks.notify", # Replace with the actual task path
"schedule": new_schedule,
"args": (expo_token, task_data.context,), # Additional arguments if needed
}
else:
del app.conf.beat_schedule[task_name]
def update_celery_beat_schedule():
tasks = get_data_from_database()
for task_data in tasks:
hour, minute = task_data.ping_times.split(':')
schedule = create_celery_schedule(hour, minute)
update_or_delete_scheduled_task(task_data, new_schedule=schedule)
print(app.conf.beat_schedule)
@app.task(name="notify")
def notify(participant_token, context):
current_datetime = datetime.datetime.now()
formatted_datetime = current_datetime.strftime("%Y-%m-%d %H:%M:%S")
with open('output.txt', 'a') as file:
file.write(formatted_datetime + participant_token + '\n')
当我打印app.conf.beat_schedule时,我可以从数据库中看到预定的数据。 celery -A concentrAppbeat -l info 正在运行,celery 工作器也在运行。 但到了时候任务就不会发生。
我做错了什么?
我遇到了同样的问题,我认为 celery 中的 PersistentScheduler 不支持实时更新。
顺便说一句,不要相信 ChatGPT 写的任何东西,有时它是一堆废话......(在研究了 ChatGPT 后,我对这段代码很熟悉)