如何不启动相同的任务并等到它用 celerybeat 完成

问题描述 投票:0回答:2

我已经安排了 celerybeat 任务每 3 小时运行一次:

'sync_stuff': {
    'task': 'celery_tasks.sync_stuff',
    'schedule': crontab(hour='*/3')
}

有时需要超过 3 小时才能完成任务,我想确保 celery 不会在旧实例仍在运行时再次调度和运行该任务。

有没有办法仅通过 celery 或 celerybeat 设置来做到这一点?

python celery celerybeat
2个回答
7
投票

不幸的是,您必须自己实现锁定策略

阅读文档的这一部分以获取更多详细信息:

cron 一样,如果第一个任务没有在下一个任务之前完成,任务可能会重叠。如果这是一个问题,您应该使用锁定策略来确保一次只能运行一个实例(例如,请参阅确保一次只执行一个任务)。

来源:

http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#entries http://docs.celeryproject.org/en/latest/tutorials/task-cookbook.html#cookbook-task-serial


0
投票

我以稍微不同的方式解决了这个问题,方法是覆盖 Task 类的 before_start motoda 并检查是否已经有一个具有该名称的任务正在运行,如果有,新任务将被撤销。

from celery import Celery, Task


def is_task_running(task_name: str | None) -> bool:
    from django_celery_results.models import TaskResult
    return (
        TaskResult.objects.filter(
            task_name=task_name, status__in=["PENDING", "STARTED", "RETRY"]
        ).count()
        > 1
    )


def revoke_task(task_id: str, celery_app):
    celery_app.control.revoke(task_id, terminate=True)


class ExtendedTask(Task):
    def before_start(self, task_id, args, kwargs):
        if is_task_running(self.name):
            revoke_task(task_id, self.app)

app = Celery("app", task_cls=ExtendedTask)
© www.soinside.com 2019 - 2025. All rights reserved.