Celery 任务在 Django 测试中不会抛出异常

问题描述 投票:0回答:3

我的 Django 测试中包含几个 celery 任务。不幸的是,当通过 .delay() 调用任务时,不会抛出异常。我将 CELERY_ALWAYS_EAGER 设置为 True。

任务.py

import celeryapp as app

@app.task()
def exception_task():
    print 'CELERY_ALWAYS_EAGER:', app.conf['CELERY_ALWAYS_EAGER']
    raise Exception('foo')

测试.py

def test_exception_in_task(self):
        from tasks import exception_task
        exception_task.delay()

输出

CELERY_ALWAYS_EAGER: True
.
----------------------------------------------------------------------
Ran 1 test in 0.686s

删除 .delay 时,测试退出并出现预期错误:

ERROR: test_exception_in_task
Exception: foo

版本

celery==3.1.4
Django==1.6.4
python django testing celery celery-task
3个回答
16
投票

似乎我还必须将 CELERY_EAGER_PROPAGATES_EXCEPTIONS 设置为 True。

芹菜>=5.3.1

CELERY_TASK_EAGER_PROPAGATES


5
投票

在 celery 4.0 下,我必须使用 CELERY_TASK_EAGER_PROPAGATES


0
投票
@override_settings(task_always_eager=True, task_eager_propagates=True)
def test_my_task(self):
    pass

这对我使用 Celery 5 有用。

有关配置名称更改的文档:https://docs.celeryq.dev/en/stable/userguide/configuration.html

© www.soinside.com 2019 - 2024. All rights reserved.