在 Celery Beat 中查找周期性任务的下一次执行时间

问题描述 投票:0回答:1

我正在创建周期性任务,以便用户可以使用以下代码访问下一课:

task = PeriodicTask.objects.create(
            interval=enrolment.course.interval,
            name=f"enrolment_id: {enrolment.id}",
            task="courses.tasks.next_program",
            args=json.dumps([enrolment.id]),
        )
enrolment.task = task
enrolment.save()

如何获取下一个任务将运行的日期时间?

python django celery celerybeat
1个回答
0
投票

我用的是celery内置的

schedule.remaining_estimate
:

def datetime_next_lesson(self):
    task = self.task
    if not task: # just in case
        return 0
    # if task hasn't been run yet, last_run_at will be null, use datetime_created in this case
    last_run_at = task.last_run_at or task.start_time or self.datetime_created
    return timezone.now() + task.schedule.remaining_estimate(last_run_at)
© www.soinside.com 2019 - 2024. All rights reserved.