celery 节拍动态调度器

问题描述 投票:0回答:1

我有一个包含动态计划任务的时间和元数据的数据库。

from __future__ import absolute_import, unicode_literals

import os

from celery import Celery

from concentrApp import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'concentrApp.settings')
app = Celery('concentrApp')

# You can add celery settings in settings.py starting with CELERY_
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda : settings.INSTALLED_APPS)


# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'concentrApp.settings')

app.conf.timezone = "Asia/Jerusalem"

我制作了从数据库中获取数据并插入到 celery 中的函数:

from datetime import datetime
from application.models import Schedule
from concentrApp.celery import app
from celery.schedules import crontab


def get_data_from_database():
    return Schedule.objects.all()


# hour, min should be int, change day_of_week to "0-3"
def create_celery_schedule(hour, minute):
    return crontab(minute=int(minute), hour=int(hour), day_of_week="0-6")


def update_or_delete_scheduled_task(task_data, new_schedule=None):
    task_name = task_data.id
    expo_token = task_data.participant.expo_token
    if new_schedule:
        app.conf.beat_schedule[task_name] = {
            "task": "concentrApp.tasks.notify",  # Replace with the actual task path
            "schedule": new_schedule,
            "args": (expo_token, task_data.context,),  # Additional arguments if needed
        }
    else:
        del app.conf.beat_schedule[task_name]


def update_celery_beat_schedule():
    tasks = get_data_from_database()

    for task_data in tasks:
        hour, minute = task_data.ping_times.split(':')
        schedule = create_celery_schedule(hour, minute)
        update_or_delete_scheduled_task(task_data, new_schedule=schedule)

    print(app.conf.beat_schedule)


@app.task(name="notify")
def notify(participant_token, context):
    current_datetime = datetime.datetime.now()
    formatted_datetime = current_datetime.strftime("%Y-%m-%d %H:%M:%S")
    with open('output.txt', 'a') as file:
        file.write(formatted_datetime + participant_token + '\n')

当我打印app.conf.beat_schedule时,我可以从数据库中看到预定的数据。 celery -A concentrAppbeat -l info 正在运行,celery 工作器也在运行。 但到了时候任务就不会发生。

我做错了什么?

python django celery celery-task celerybeat
1个回答
0
投票

我遇到了同样的问题,我认为 celery 中的 PersistentScheduler 不支持实时更新。

顺便说一句,不要相信 ChatGPT 写的任何东西,有时它是一堆废话......(在研究了 ChatGPT 后,我对这段代码很熟悉)

© www.soinside.com 2019 - 2024. All rights reserved.