我想知道当前正在执行的 celery 任务和等待任务的数量。
但是,所有的 celery 检查命令都显示不正确的结果,并且似乎不太容易使用它们。
我的芹菜信息如下。
- ** ---------- [config]
- ** ---------- .> app: myapp:0x7f72b15195c0
- ** ---------- .> transport: amqp://guest:**@localhost:5672//
- ** ---------- .> results: redis://localhost/
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> myqueue_1 exchange=myqueue_1(direct) key=myqueue_1
我尝试通过以下方式计算数量。
def get_queue_task_count(queue_name):
inspector = Inspect(app=celery_app)
active_tasks = inspector.active()
specific_queue_routing_keys = set()
for worker, tasks in active_tasks.items():
for task in tasks:
if task['delivery_info']['routing_key'] == queue_name:
specific_queue_routing_keys.add(task['delivery_info']['routing_key'])
return len(specific_queue_routing_keys)
在这段代码中,我期望这个函数可以返回当前在特定队列中执行的任务数, 但每次我调用这个函数时,inspector.active()都会返回不同的信息,我认为这是因为同一服务器上运行的其他队列。(队列1,队列2,队列3..我只想知道有关任务中的任务队列1!)
这种情况,有什么有效的方法来获取特定队列的信息吗?
您可以使用 Celery 的内置监控工具
celery.events.state.State
来获取某个队列中正在运行和等待的作业数量的准确计数。这使您可以实时访问有关任务和队列的信息。以下是如何使用它来找出某个队列中有多少作业。
from celery import Celery
from celery.events.state import State
app = Celery('myapp')
# Assuming you have a running Celery worker with events enabled
state = State(app=app)
def get_queue_task_count(queue_name):
# Get the current state of tasks
state.refresh()
# Get information about the specific queue
queue = state.task_consumer.queues.get(queue_name)
if queue:
# Get the number of tasks currently in the queue
waiting_tasks = len(queue._queue)
# Get the number of tasks currently being processed
active_tasks = len(queue.active_tasks)
return {
"queue_name": queue_name,
"waiting_tasks": waiting_tasks,
"active_tasks": active_tasks
}
else:
return {
"queue_name": queue_name,
"waiting_tasks": 0,
"active_tasks": 0
}
# Example usage
queue_name = 'myqueue_1'
queue_info = get_queue_task_count(queue_name)
print(f"Queue '{queue_info['queue_name']}': Waiting tasks: {queue_info['waiting_tasks']}, Active tasks: {queue_info['active_tasks']}")
术语
status
指的是任务和工人的当前状态。要使用最新信息更新状态,请使用 state.refresh()
函数。然后,您可以查看有关队列的详细信息,例如等待任务和活动任务的数量。如果队列不存在或没有作业,则返回适当的默认值。为了使此代码正确运行,您必须让 Celery 工作线程在启用事件的情况下运行。