查看celery任务是否存在

问题描述 投票:0回答:10

是否可以查出某个任务id的任务是否存在?当我尝试获取状态时,我总是会处于待处理状态。

>>> AsyncResult('...').status
'PENDING'

我想知道给定的任务 id 是否是真正的 celery 任务 id 而不是随机字符串。我想要不同的结果,具体取决于某个 id 是否有有效的任务。

过去可能存在具有相同 id 的有效任务,但结果可能已从后端删除。

python task status celery
10个回答
40
投票

Celery 在发送任务时不会写入状态,这部分是一种优化(请参阅文档)。

如果你真的需要它,添加起来很简单:

from celery import current_app
# `after_task_publish` is available in celery 3.1+
# for older versions use the deprecated `task_sent` signal
from celery.signals import after_task_publish

# when using celery versions older than 4.0, use body instead of headers

@after_task_publish.connect
def update_sent_state(sender=None, headers=None, **kwargs):
    # the task may not exist if sent using `send_task` which
    # sends tasks by name, so fall back to the default result backend
    # if that is the case.
    task = current_app.tasks.get(sender)
    backend = task.backend if task else current_app.backend
 
    backend.store_result(headers['id'], None, "SENT")

然后您可以测试 PENDING 状态以检测任务尚未(似乎) 已发送:

>>> result.state != "PENDING"

10
投票

如果任务 ID 未知,AsyncResult.state 将返回 PENDING。

待定

任务正在等待执行或未知。任何不是的任务 ID known 暗示处于待定状态。

http://docs.celeryproject.org/en/latest/userguide/tasks.html#pending

如果您需要区分未知的 id 和现有的 id,您可以提供自定义任务 id:

>>> from tasks import add
>>> from celery.utils import uuid
>>> r = add.apply_async(args=[1, 2], task_id="celery-task-id-"+uuid())
>>> id = r.task_id
>>> id
'celery-task-id-b774c3f9-5280-4ebe-a770-14a6977090cd'
>>> if not "blubb".startswith("celery-task-id-"): print "Unknown task id"
... 
Unknown task id
>>> if not id.startswith("celery-task-id-"): print "Unknown task id"
... 

3
投票

现在我正在使用以下方案:

  1. 获取任务ID。
  2. 设置为 memcache 键,如 'task_%s' % task.id 消息 'Started'。
  3. 将任务 ID 传递给客户端。
  4. 现在我可以从客户端监控任务状态(从任务消息设置到内存缓存)。
  5. 从就绪任务开始 - 设置为 memcache 键消息“就绪”。
  6. 从客户端任务准备就绪 - 启动特殊任务,从内存缓存中删除密钥并执行必要的清理操作。

0
投票

您需要在您创建的 AsyncTask 对象上调用

.get()
才能实际从后端获取结果。

请参阅 Celery 常见问题解答


进一步澄清我的答案。

从技术上讲,任何字符串都是有效的 ID,无法验证任务 ID。查明任务是否存在的唯一方法是询问后端是否知道该任务,为此您必须使用

.get()

这引入了当后端没有有关您提供的任务 ID 的任何信息时

.get()
阻塞的问题,这是设计使您可以启动任务然后等待其完成。

对于最初的问题,我假设OP想要获取之前完成的任务的状态。为此,您可以传递一个非常小的超时并捕获超时错误:

from celery.exceptions import TimeoutError
try:
    # fetch the result from the backend
    # your backend must be fast enough to return
    # results within 100ms (0.1 seconds)
    result = AsyncResult('blubb').get(timeout=0.1)
except TimeoutError:
    result = None

if result:
    print "Result exists; state=%s" % (result.state,)
else:
    print "Result does not exist"

不言而喻,只有当您的后端存储结果时,这才有效,如果不是,则无法知道任务 ID 是否有效,因为没有任何内容保存它们的记录。


更多澄清。

使用 AMQP 后端无法完成您想要做的事情,因为 它不存储结果,而是转发结果

我的建议是切换到数据库后端,以便结果位于可以在现有 celery 模块之外查询的数据库中。如果结果数据库中不存在任何任务,您可以假设 ID 无效。


0
投票

所以我有这个想法:

import project.celery_tasks as tasks

def task_exist(task_id):
  found = False
  # tasks is my imported task module from celery
  # it is located under /project/project, where the settings.py file is located
  i = tasks.app.control.inspect()
  s = i.scheduled()
  for e in s:
    if task_id in s[e]:
      found = True
      break
  a = i.active()
  if not found:
    for e in a:
      if task_id in a[e]:
        found = True
        break
  r = i.reserved()
  if not found:
    for e in r:
      if task_id in r[e]:
        found = True
        break
  # if checking the status returns pending, yet we found it in any queues... it means it exists...
  # if it returns pending, yet we didn't find it on any of the queues... it doesn't exist
  return found

根据 https://docs.celeryproject.org/en/stable/userguide/monitoring.html 不同类型的队列检查是: 积极的, 预定的, 预订的, 撤销, 挂号的, 统计数据, 查询任务,

所以请随心所欲地挑选。

可能有更好的方法来检查队列中的任务,但这对我来说应该有用,目前。


0
投票

也许直接使用redis是一个很好的解决方案。

pool = redis.ConnectionPool(host=config.REDIS_HOST,
                            port=config.REDIS_PORT,
                            db=config.REDIS_DB,
                            password=config.REDIS_PASSWORD)
redis_client = Redis(connection_pool=pool)

def check_task_exist(id):
    for one in redis_client.lrange('celery', 0, -1):
        task_info = json.loads(one.decode())
        if task_info['headers']['id'] == id:
            return True
    return False

0
投票

我找到了一种检查方法,它对我有用:

def check_task_exists(task_id):
inspector = app.control.inspect()
active_tasks = inspector.active()

# Check active tasks
if active_tasks:
    for worker, tasks in active_tasks.items():
        for task in tasks:
            if task['id'] == task_id:
                return True

# Check scheduled tasks
scheduled_tasks = inspector.scheduled()

if scheduled_tasks:
    for worker, tasks in scheduled_tasks.items():
        if task_id in tasks:
            return True

# Check reserved tasks
reserved_tasks = inspector.reserved()

if reserved_tasks:
    for worker, tasks in reserved_tasks.items():
        if task_id in tasks:
            return True

# Task not found
return False

0
投票

date_done
中有一个属性叫
AsyncResult
对于系统中存在的正确任务 ID,将返回日期时间。 使用这个属性我们可以识别任务是否真实


-4
投票

尝试

AsyncResult('blubb').state

这可能有用。

它应该返回不同的东西。


-4
投票

如有错误请指正。

if built_in_status_check(task_id) == 'pending'
   if registry_exists(task_id) == true
      print 'Pending'
   else
      print 'Task does not exist'
最新问题
© www.soinside.com 2019 - 2025. All rights reserved.