删除celery中所有待处理的任务?

问题描述 投票:0回答:12

如何在不知道每个任务的

task_id
的情况下删除所有待处理任务?

task rabbitmq celery celery-task
12个回答
403
投票

来自文档

$ celery -A proj purge

from proj.celery import app
app.control.purge()

(编辑:使用当前方法更新。)


152
投票

对于芹菜3.0+:

$ celery purge

要清除特定队列:

$ celery -Q queue_name purge

34
投票

对于 Celery 2.x 和 3.x:

例如,当使用带有 -Q 参数的worker来定义队列时

celery worker -Q queue1,queue2,queue3

那么

celery purge
将不起作用,因为您无法将队列参数传递给它。它只会删除默认队列。 解决方案是使用
--purge
参数启动您的工作程序,如下所示:

celery worker -Q queue1,queue2,queue3 --purge

然而,这将运行工作人员。

其他选项是使用 celery 的 amqp 子命令

celery amqp queue.delete queue1
celery amqp queue.delete queue2
celery amqp queue.delete queue3

19
投票

在芹菜 3+ 中:

命令行:

$ celery -A proj purge

以编程方式:

>>> from proj.celery import app
>>> app.control.purge()

http://docs.celeryproject.org/en/latest/faq.html#how-do-i-purge-all-waiting-tasks


10
投票

我发现

celery purge
不适用于我更复杂的芹菜配置。 我出于不同目的使用多个命名队列:

$ sudo rabbitmqctl list_queues -p celery name messages consumers
Listing queues ...  # Output sorted, whitespaced for readability
celery                                          0   2
[email protected]                      0   1
[email protected]                      0   1
apns                                            0   1
[email protected]                        0   1
analytics                                       1   1
[email protected]                   0   1
bcast.361093f1-de68-46c5-adff-d49ea8f164c0      0   1
bcast.a53632b0-c8b8-46d9-bd59-364afe9998c1      0   1
celeryev.c27b070d-b07e-4e37-9dca-dbb45d03fd54   0   1
celeryev.c66a9bed-84bd-40b0-8fe7-4e4d0c002866   0   1
celeryev.b490f71a-be1a-4cd8-ae17-06a713cc2a99   0   1
celeryev.9d023165-ab4a-42cb-86f8-90294b80bd1e   0   1

第一列是队列名称,第二列是队列中等待的消息数量,第三列是该队列的侦听器数量。 队列是:

  • celery - 标准幂等 celery 任务队列
  • apns - Apple 推送通知服务任务队列,不那么幂等
  • 分析 - 长时间运行的夜间分析队列
  • *.pidbox - 工作命令队列,例如关闭和重置,每个工作人员一个(2 个 celery 工作人员、1 个 apns 工作人员、1 个分析工作人员)
  • bcast.* - 广播队列,用于向侦听队列的所有工作人员发送消息(而不仅仅是第一个获取队列的工作人员)
  • celeryev.* - Celery 事件队列,用于报告任务分析

分析任务是一种强力任务,在小数据集上效果很好,但现在需要超过 24 小时才能处理。 有时,会出现问题,并且会卡在等待数据库上。 它需要重写,但在那之前,当它卡住时,我终止任务,清空队列,然后重试。 我通过查看分析队列的消息计数来检测“卡住”,该计数应该为 0(已完成分析)或 1(等待昨晚的分析完成)。 2 或更高是不好的,我会收到一封电子邮件。

celery purge
提供从广播队列之一删除任务,但我没有看到选择不同命名队列的选项。

这是我的过程:

$ sudo /etc/init.d/celeryd stop  # Wait for analytics task to be last one, Ctrl-C
$ ps -ef | grep analytics  # Get the PID of the worker, not the root PID reported by celery
$ sudo kill <PID>
$ sudo /etc/init.d/celeryd stop  # Confim dead
$ python manage.py celery amqp queue.purge analytics
$ sudo rabbitmqctl list_queues -p celery name messages consumers  # Confirm messages is 0
$ sudo /etc/init.d/celeryd start

10
投票

如果你想删除所有挂起的任务以及活动和保留的任务以完全停止 Celery,这对我有用:

from proj.celery import app
from celery.task.control import inspect, revoke

# remove pending tasks
app.control.purge()

# remove active tasks
i = inspect()
jobs = i.active()
for hostname in jobs:
    tasks = jobs[hostname]
    for task in tasks:
        revoke(task['id'], terminate=True)

# remove reserved tasks
jobs = i.reserved()
for hostname in jobs:
    tasks = jobs[hostname]
    for task in tasks:
        revoke(task['id'], terminate=True)

8
投票

在芹菜3+中

http://docs.celeryproject.org/en/3.1/faq.html#how-do-i-purge-all-waiting-tasks

命令行

清除命名队列:

 celery -A proj amqp queue.purge <queue name>

清除配置的队列

celery -A proj purge

我已清除消息,但队列中仍有消息? 答:任务一旦实际执行就会被确认(从队列中删除)。 Worker 收到任务后,需要一段时间才能真正执行,特别是如果已经有很多任务正在等待执行。未确认的消息将由工作线程保留,直到它关闭与代理(AMQP 服务器)的连接。当该连接关闭时(例如,因为工作程序已停止),任务将由代理重新发送到下一个可用的工作程序(或重新启动时的同一个工作程序),因此要正确清除等待任务的队列,您可以必须停止所有工作人员,然后使用 celery.control.purge() 清除任务。

因此,要清除整个队列,必须停止工作人员。


5
投票

对于 Celery 5.0+,从 CLI 执行此操作并针对特定队列:

celery -A APP_NAME purge --queues QUEUE_NAME

如果您想像我一样在步骤中执行此操作,请在末尾添加

-f
选项以跳过确认步骤。


4
投票

芹菜 4+ celery purge 命令清除所有配置的任务队列

celery -A *APPNAME* purge

以编程方式:

from proj.celery import app
app.control.purge()

所有待处理的任务将被清除。 参考:celerydoc


2
投票

1. 要正确清除等待任务队列,您必须停止所有工作人员(http://celery.readthedocs.io/en/latest/faq.html#i-ve-purged-messages-but-there-are-still -消息留在队列中):

$ sudo rabbitmqctl stop

或者(如果 RabbitMQ/消息代理由 Supervisor 管理):

$ sudo supervisorctl stop all

2. ...然后从特定队列中清除任务:

$ cd <source_dir>
$ celery amqp queue.purge <queue name>

3. 启动 RabbitMQ:

$ sudo rabbitmqctl start

或者(如果 RabbitMQ 由 Supervisor 管理):

$ sudo supervisorctl start all

2
投票

对于 Celery 版本 5.0+,使用 RabbitMQ 作为代理

我们需要首先建立从程序到经纪人的新连接, 并将连接与队列绑定以进行清除。

# proj/celery.py
from celery import Celery
app = Celery('proj')
from proj.celery import app
queues = ['queue_A', 'queue_B', 'queue_C']
with app.connection_for_write() as conn:
    conn.connect()
    for queue in queues:
        count = app.amqp.queues[queue].bind(conn).purge()
        print(f'Purge {queue} with {count} message(s)')

0
投票

我认为这可能已经解决了。旧任务仍然在我的 django 管理中的 django-celery-beat> 定期任务中,因此转到 django 管理页面并删除所有这些任务,然后如果您在 docker 容器中,请重新启动它,所有问题都解决了

© www.soinside.com 2019 - 2024. All rights reserved.