我在使用 Django 开发应用程序并使用 Celery 基于模型字段安排任务时遇到了挑战。我想分享我的解决方案并寻求有关潜在改进的建议。作为一个相对缺乏经验的开发人员,我欢迎任何指导。
我正在开发一个用于管理培训课程的 Django 应用程序,我需要在注册截止日期到来时安排任务。我已经实现了 Celery 来进行任务调度,并将任务 ID 存储在模型字段中,以便在必要时进行撤销。我的挑战是确保当注册截止日期更改时,如果截止日期已过,计划的任务也会更新或执行。
from datetime import datetime
from celery.result import AsyncResult
from django.db import transaction
from django.db.models.signals import post_save, pre_delete, pre_save
from django.dispatch import receiver
from django.shortcuts import get_object_or_404
from django.utils import timezone
from .models import Training
from .tasks import post_registration_deadline
def delay_and_assign_registration_deadline_task(training_id):
"""
The post_registration_deadline notifies the members of the training manager group if the minimum number of
candidates is not reached. If there is still some registrations to review, they are canceled with the reason
"REGISTRATION_DEADLINE_PASSED".
"""
training = get_object_or_404(Training, id=training_id)
task_id = post_registration_deadline.delay(training.id).task_id
training.registration_deadline_task = task_id
training.save()
@receiver(pre_save, sender=Training)
def pre_save_training(sender, instance, **kwargs):
if instance.type.registration_enabled:
"""
if the training is already created, we check if the registration deadline has changed
if it has, we revoke the previous task. If the new deadline is now or before, we execute the task.
If not, we schedule it for the new deadline date.
"""
if instance.pk:
previous = get_object_or_404(Training, id=instance.id)
if previous.registration_deadline != instance.registration_deadline:
task = AsyncResult(str(getattr(instance, 'registration_deadline_task')))
task.revoke()
if instance.registration_deadline <= timezone.now().date():
transaction.on_commit(lambda: delay_and_assign_registration_deadline_task(instance.id))
else:
eta_time = datetime.combine(
instance.registration_deadline,
datetime.max.time(),
tzinfo=timezone.utc
)
instance.registration_deadline_task = post_registration_deadline.apply_async(
eta=eta_time, args=[instance.id]
).task_id
@receiver(post_save, sender=Training)
def post_save_training(sender, instance, created, **kwargs):
"""
if, at training creation, the registration deadline is today or before, we start the
post_registration_deadline_task if not, we schedule it for the registration deadline date.
"""
if created:
if instance.type.registration_enabled:
if instance.registration_deadline <= timezone.now().date():
transaction.on_commit(lambda: delay_and_assign_registration_deadline_task(instance.id))
else:
eta_time = datetime.combine(
instance.registration_deadline,
datetime.max.time(),
tzinfo=timezone.utc
)
instance.registration_deadline_task = post_registration_deadline.apply_async(
eta=eta_time, args=[instance.id]
).task_id
@receiver(pre_delete, sender=Training)
def pre_delete_training(sender, instance, **kwargs):
registration_deadline_task = AsyncResult(str(getattr(instance, 'registration_deadline_task')))
registration_deadline_task.revoke()
考虑到任务日期将来可能会发生变化,使用周期性任务通常更灵活且更易于管理。原因如下:
delay
:delay
:您需要跟踪
apply_async()
返回的任务 ID,并在日期发生变化时使用它来撤销任务。然后,您可以使用新日期重新安排时间。
from celery.task.control import revoke
# To cancel a task
revoke(task_id, terminate=True)
您只需更新时间表即可。如果您使用 Django-Celery-Beat,这可以通过 Django 管理界面或以编程方式完成。
将任务详细信息(包括其预期执行日期)存储在数据库中。 添加一个字段来指示任务是否已执行。
创建定期运行的任务(例如,每分钟或每小时)。 在此任务中,根据预期执行日期以及是否已执行,在数据库中查询应执行的任何任务。 执行符合条件的任务并在数据库中更新其状态。
这种方法使您可以通过简单地更新数据库中的execute_at字段来灵活地更改执行日期。定期任务将在下次运行时获取此更改。
如果上述方法有问题或有任何疑问,请告诉我