如何记录 django celery 任务中发生的异常

问题描述 投票:0回答:5

我已经使用它们的守护进程指令将 celery 设置为与我的 django 应用程序一起使用(http://docs.celeryproject.org/en/latest/tutorials/daemonizing.html#daemonizing

这是我的测试任务

@periodic_task(run_every=timedelta(seconds=10))
def debugger():
    logger.info("Running debugger")
    raise Exception('Failed')

我需要一种方法来知道该任务(调试器)由于异常而失败。 Celery 的日志文件打印 logger.info("running debugger") 日志,但不记录异常。我是否遗漏了什么,或者我应该以其他方式找到失败的任务?

logging celery django-celery
5个回答
23
投票

问题:

我希望 Celery 捕获异常并将其写入日志文件,而不是明显吞掉它们......

出于专业解决方案的目的,当前的最佳答案是马马虎虎。许多 Python 开发人员会根据具体情况认为全面错误捕获是一个危险信号。评论中清楚地表达了对此的合理厌恶:

等一下,我希望至少在每个失败的任务中都会在工作日志中记录一些内容......

Celery 确实捕获了异常,它只是没有执行 OP 希望它执行的操作(它将其存储在结果后端中)。以下要点是互联网针对此问题提供的最佳要点。它有点过时,但请注意叉子和星星的数量。

https://gist.github.com/darklow/c70a8d1147f05be877c3

要点是采用失败案例并用它做一些定制的事情。这是OP问题的超集。以下是如何调整要点中的解决方案来记录异常。

import logging

logger = logging.getLogger('your.desired.logger')


class LogErrorsTask(Task):
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        logger.exception('Celery task failure!!!1', exc_info=exc)
        super(LogErrorsTask, self).on_failure(exc, task_id, args, kwargs, einfo)

您仍然需要确保所有任务都继承自该任务类,并且要点显示了如果您使用

@task
装饰器(带有
base=LogErrorsTask
kwarg)如何执行此操作。

此解决方案的好处是不会将代码嵌套在任何额外的 try- except 上下文中。这是在 celery 已经使用的故障代码路径上搭载的。


8
投票

您可以查看Celery使用指南

from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@app.task
def div():
    try:
        1 / 0
    except ZeroDivisionError:
        logger.exception("Task error")

来自 python 日志模块的文档:

Logger.Exception(msg, *args)

在此记录器上记录一条错误级别的消息。这些参数被解释为 debug()。异常信息添加到日志消息中。此方法只能从异常处理程序中调用。


8
投票

为了接收来自 Celery 任务的所有未处理的异常,我注册了一个信号处理程序。我正在格式化一条

logging.error
消息,然后可以通过默认的 Python 日志配置来处理该消息。

这是相关部分

from celery import signals

@signals.task_retry.connect
@signals.task_failure.connect
@signals.task_revoked.connect
def on_task_failure(**kwargs):
    """Abort transaction on task errors.
    """
    # celery exceptions will not be published to `sys.excepthook`. therefore we have to create another handler here.
    from traceback import format_tb

    log.error('[task:%s:%s]' % (kwargs.get('task_id'), kwargs['sender'].request.correlation_id, )
              + '\n'
              + ''.join(format_tb(kwargs.get('traceback', [])))
              + '\n'
              + str(kwargs.get('exception', '')))

请注意,该信号处理程序自动适用于所有任务;即它不需要更改您的

task
装饰器。


1
投票

使用回溯模块将跟踪捕获为字符串并将其发送到记录器。

try:
    ...
except:
    import traceback
    logger.info(traceback.format_exc())

1
投票

您还可以覆盖 celery 应用程序,以避免向每个

base
装饰器添加
@app.task
kwarg:

import logging
from celery import Celery, Task

logger = logging.getLogger(__name__)

class LoggingTask(Task):
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        logger.exception('Task failed: %s' % exc, exc_info=exc)
        super(LoggingTask, self).on_failure(exc, task_id, args, kwargs, einfo)

class LoggingCelery(Celery):
    def task(self, *args, **kwargs):
        kwargs.setdefault('base', LoggingTask)
        return super(LoggingCelery, self).task(*args, **kwargs)

app = LoggingCelery(__name__)
最新问题
© www.soinside.com 2019 - 2025. All rights reserved.