我正在创建周期性任务,以便用户可以使用以下代码访问下一课:
task = PeriodicTask.objects.create(
interval=enrolment.course.interval,
name=f"enrolment_id: {enrolment.id}",
task="courses.tasks.next_program",
args=json.dumps([enrolment.id]),
)
enrolment.task = task
enrolment.save()
如何获取下一个任务将运行的日期时间?
我用的是celery内置的
schedule.remaining_estimate
:
def datetime_next_lesson(self):
task = self.task
if not task: # just in case
return 0
# if task hasn't been run yet, last_run_at will be null, use datetime_created in this case
last_run_at = task.last_run_at or task.start_time or self.datetime_created
return timezone.now() + task.schedule.remaining_estimate(last_run_at)