我已经安排了 celerybeat 任务每 3 小时运行一次:
'sync_stuff': {
'task': 'celery_tasks.sync_stuff',
'schedule': crontab(hour='*/3')
}
有时需要超过 3 小时才能完成任务,我想确保 celery 不会在旧实例仍在运行时再次调度和运行该任务。
有没有办法仅通过 celery 或 celerybeat 设置来做到这一点?
不幸的是,您必须自己实现锁定策略。
阅读文档的这一部分以获取更多详细信息:
与 cron 一样,如果第一个任务没有在下一个任务之前完成,任务可能会重叠。如果这是一个问题,您应该使用锁定策略来确保一次只能运行一个实例(例如,请参阅确保一次只执行一个任务)。
来源:
http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#entries http://docs.celeryproject.org/en/latest/tutorials/task-cookbook.html#cookbook-task-serial
我以稍微不同的方式解决了这个问题,方法是覆盖 Task 类的 before_start motoda 并检查是否已经有一个具有该名称的任务正在运行,如果有,新任务将被撤销。
from celery import Celery, Task
def is_task_running(task_name: str | None) -> bool:
from django_celery_results.models import TaskResult
return (
TaskResult.objects.filter(
task_name=task_name, status__in=["PENDING", "STARTED", "RETRY"]
).count()
> 1
)
def revoke_task(task_id: str, celery_app):
celery_app.control.revoke(task_id, terminate=True)
class ExtendedTask(Task):
def before_start(self, task_id, args, kwargs):
if is_task_running(self.name):
revoke_task(task_id, self.app)
app = Celery("app", task_cls=ExtendedTask)