有没有有效的方法来检索特定队列中当前正在执行的 celery 任务的数量?

问题描述 投票:0回答:1

我想知道当前正在执行的 celery 任务和等待任务的数量。

但是,所有的 celery 检查命令都显示不正确的结果,并且似乎不太容易使用它们。

我的芹菜信息如下。

- ** ---------- [config]
- ** ---------- .> app:         myapp:0x7f72b15195c0
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     redis://localhost/
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> myqueue_1     exchange=myqueue_1(direct) key=myqueue_1

我尝试通过以下方式计算数量。

def get_queue_task_count(queue_name):

    inspector = Inspect(app=celery_app)
    active_tasks = inspector.active()

    specific_queue_routing_keys = set()
    for worker, tasks in active_tasks.items():
        for task in tasks:
            if task['delivery_info']['routing_key'] == queue_name:
                specific_queue_routing_keys.add(task['delivery_info']['routing_key'])

    return len(specific_queue_routing_keys)

在这段代码中,我期望这个函数可以返回当前在特定队列中执行的任务数, 但每次我调用这个函数时,inspector.active()都会返回不同的信息,我认为这是因为同一服务器上运行的其他队列。(队列1,队列2,队列3..我只想知道有关任务中的任务队列1!)

这种情况,有什么有效的方法来获取特定队列的信息吗?

rabbitmq celery django-celery celery-task
1个回答
0
投票

您可以使用 Celery 的内置监控工具

celery.events.state.State
来获取某个队列中正在运行和等待的作业数量的准确计数。这使您可以实时访问有关任务和队列的信息。以下是如何使用它来找出某个队列中有多少作业。

from celery import Celery
from celery.events.state import State

app = Celery('myapp')

# Assuming you have a running Celery worker with events enabled
state = State(app=app)

def get_queue_task_count(queue_name):
    # Get the current state of tasks
    state.refresh()

    # Get information about the specific queue
    queue = state.task_consumer.queues.get(queue_name)

    if queue:
        # Get the number of tasks currently in the queue
        waiting_tasks = len(queue._queue)

        # Get the number of tasks currently being processed
        active_tasks = len(queue.active_tasks)

        return {
            "queue_name": queue_name,
            "waiting_tasks": waiting_tasks,
            "active_tasks": active_tasks
        }
    else:
        return {
            "queue_name": queue_name,
            "waiting_tasks": 0,
            "active_tasks": 0
        }

# Example usage
queue_name = 'myqueue_1'
queue_info = get_queue_task_count(queue_name)
print(f"Queue '{queue_info['queue_name']}': Waiting tasks: {queue_info['waiting_tasks']}, Active tasks: {queue_info['active_tasks']}")

术语

status
指的是任务和工人的当前状态。要使用最新信息更新状态,请使用
state.refresh()
函数。然后,您可以查看有关队列的详细信息,例如等待任务和活动任务的数量。如果队列不存在或没有作业,则返回适当的默认值。为了使此代码正确运行,您必须让 Celery 工作线程在启用事件的情况下运行。

© www.soinside.com 2019 - 2024. All rights reserved.