如何使用 pytest 测试 Celery 实例的任务?

问题描述 投票:0回答:2

如何使用 pytest 测试 Celery 实例的任务?我不是在谈论使用 pytest 测试使用

@shared_task
装饰器创建的 Celery 任务。为此,已经有一个好的解决方案。我说的是使用
app.task
装饰器创建的测试任务,其中
app
是一个 Celery 应用程序,一个
Celery
实例。

celery celery-task
2个回答
0
投票

下面的代码对我有用。

celery_worker.reload()
重新注册任务。

def test_xxx(celery_app, celery_worker):
    @celery_app.task
    def mul(x, y):
        return x * y

    celery_worker.reload()
    assert mul.delay(4, 4).get(timeout=10) == 16

0
投票

我找不到任何关于此的官方文档。以下是我的一些实验的总结。

我的版本

celery[pytest]==5.3.6
pytest==7.4.3

基础知识:https://docs.celeryq.dev/en/stable/userguide/testing.html

celery_worker 可以如下配置。

@pytest.fixture(scope='session')
def celery_worker_parameters():
    return {"without_heartbeat": True}

查看 https://github.com/celery/celery/blob/da1146ab60065847b9742bb61190d52a7a2c5fdf/celery/contrib/testing/worker.py#L155 获取 celery_worker_parameters 提示。

这是如何运作的? celery_worker 在其签名中包含 celery_worker_parameters 固定装置。请参阅 https://github.com/celery/celery/blob/da1146ab60065847b9742bb61190d52a7a2c5fdf/celery/contrib/pytest.py#L199https://github.com/celery/celery/blob/da1146ab60065847b9742bb61190d5 2a7a2c5fdf/芹菜/contrib/ pytest.py#L160

您可以尝试以下方法来配置 celery_app 并看看它是否适合您:

@pytest.mark.celery(
  broker_url="memory://localhost:8000/"
)
def test_how_to_use_celery_app_and_worker(
  celery_app,
  celery_worker,
)

Celery 加载标记并自行配置。请参阅此处:https://github.com/celery/celery/blob/da1146ab60065847b9742bb61190d52a7a2c5fdf/celery/contrib/pytest.py#L72

我的测试环境有一些限制。我在 conftest.py 中使用了一个固定装置来进行 celery_app 配置。

@pytest.fixture
def os_environ_celery_app_cfg():
    with mock.patch.dict(os.environ, {
        "CELERY_BROKER_URL": "memory://localhost:8000/",
    }) as mock_environ:
        yield mock_environ

有关这些环境变量的提示,请参阅 https://github.com/celery/celery/blob/da1146ab60065847b9742bb61190d52a7a2c5fdf/celery/app/utils.py#L103

我的单元测试:

def test_how_to_use_celery_app_and_worker(
  os_environ_celery_app_cfg,
  celery_app,
  celery_worker,
)

  @celery_app.task(bind=True) # ** ensure @celery_app prefix **
  def my_worker(self: Task, *args, **kwargs):
    pass

  celery_worker.reload() # ** call after task declaration **

  my_worker.apply_async()

声明异步任务后调用 celery_worker.reload() 非常重要。这是因为该任务的 Celery 策略需要重新配置。请参阅https://github.com/celery/celery/blob/da1146ab60065847b9742bb61190d52a7a2c5fdf/celery/worker/worker.py#L273

另外,确保任务装饰器具有 @celery_app 前缀:

@celery_app.task

单元测试中的异步任务将在工作线程中运行。单元测试装置引入的任何效果/好处将在工作线程中不可用。

© www.soinside.com 2019 - 2024. All rights reserved.