使用 pytest 和 Monkeypatching 进行 Celery 集成测试

问题描述 投票:0回答:1

我正在尝试在 docker 上使用 Python 3.9.5、Celery 5.2.6、pytest 7.1.0 和 FastAPI 运行一些简单的集成测试。

项目结构:

  📦
  ┣ 📂app
  ┣ ┣ 📂api
  ┣ ┣ ┣ 📂routes
  ┣ ┣ ┣ ┗ 📜celery.py
  ┣ ┗ 📂services
  ┣    ┗  📜operations.py
  ┣ ┣ 📂celery
  ┣ ┃ ┣ 📜tasks.py
  ┣ ┃ ┣ 📜utils.py
  ┣ ┃ ┗ 📜worker.py
  ┣ 📂tests
     ┣ 📜conftest.py
     ┗ 📜test_celery.py

芹菜容器运行:

command: celery -A app.celery.worker:celery_app worker --loglevel INFO --logfile=./celery.log --hostname=worker_1_dev@%h
,并且按预期工作。

Web 服务器容器包含:

command: uvicorn app.api.server:app 

两者都具有完整项目目录的绑定安装。

应该如何定义 celery 固定装置,以便我可以对

operations.py
或任务本身进行 Monkeypatch? 现在,当我的测试运行时,实际的工作容器会拾取实际任务并运行它们,所以我猜问题出在 在赛程中。设置始终急切不会改变行为,也不建议这样做。 工作线程和应用程序实例设备按照以下方式返回对象:

celery_worker: gen44@XXX
celery_app: <Celery celery.tests at XXX>

这似乎是正确的,它们不是实际的工作容器或

app.celery.worker:celery_app
实例。 我尝试过弄乱文档中的其他装置、模拟方法等,但我在这方面花费了太多时间,并且只是进行了测试 似乎不起作用。就像我说的,实际容器按预期执行任务。

应用程序/api/routes/celery.py

class TaskStatus(CoreModel):
    task_id: str
    task_status: Optional[str]
    task_result: Optional[Any]

@router.post(
  "/operations/add-with-factor",
  name="celery:add-times-factor",
)
def add_times_factor(
  x: int = Query(..., gt=0),
  y: int = Query(..., gt=0),
):
  task = celery_app.send_task(
    "add_times_factor",
    args=[x, y],
  )
  return {"task_id": task.id}


@router.get(
    "/tasks/{task_id}/",
    name="celery:get-task-status",
    response_model=TaskStatus,
)
def get_status(task_id):
    task_result = AsyncResult(task_id)
    return TaskStatus(
        task_id=task_id,
        task_status=task_result.status,
        task_result=task_result.result,
    )

测试/conftest.py

@pytest.fixture(scope="session")
def celery_enable_logging():
    return True

@pytest.fixture(scope="session")
def celery_config():
    return {
        "broker_url": "memory://",
        "result_backend": "rpc://",
        "task_always_eager": True, # to be removed
    }

@pytest.fixture(scope="session")
def celery_worker_parameters():
    return {
        "perform_ping_check": False,
    }

@pytest.fixture(scope="session")
def celery_includes():
    return ["app.celery.tasks"]

    ...

应用程序/celery/worker.py

import os
from celery import Celery

celery_app = Celery("app.celery.worker")
celery_app.conf.task_serializer = "json"
celery_app.conf.result_serializer = "json"
celery_app.conf.accept_content = ["application/json", "application/x-python-serialize"]
celery_app.conf.broker_url = os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379")
celery_app.conf.result_backend = os.environ.get("CELERY_RESULT_BACKEND", "redis://localhost:6379")
celery_app.autodiscover_tasks(["app.celery"])

应用程序/celery/tasks.py

from app.services.operations import OperationService

def async_to_sync(func):
    @functools.wraps(func)
    def wrapped(*args, **kwargs):
        return asyncio.run(func(*args, **kwargs))

    return wrapped

@shared_task(name="add_times_factor")
@async_to_sync
async def add_times_factor_task(x, y):
    operation_service = OperationService()
    result = await operation_service.add_times_factor(x, y)
    return {"result": result}

应用程序/服务/操作.py

FACTOR = 2

class OperationService:
    async def add_times_factor(self, x, y):
        await something
        return (x + y) * FACTOR

测试/test_celery.py

import pytest
import app.services.operations as operations
pytestmark = pytest.mark.asyncio

class TestCelery:
    async def test_add_times_factor(self,
        celery_worker,
        celery_app: Celery,
        monkeypatch,
    ):
        monkeypatch.setattr(operations, "FACTOR", 20) # ignored right now

        celery_app.set_current()
        res = await superuser_client.post(app.url_path_for("celery:add-times-factor"), params={"x": 1, "y": 2})

        task_succeeded = False
        while not task_succeeded:
            task_status_res = await superuser_client.get(app.url_path_for("celery:get-task-status", task_id=task.id))
            assert task_status_res.status_code == HTTP_200_OK
            task_succeeded = task_status_res.json()["task_status"] == "SUCCESS"
            await asyncio.sleep(0.5)

        assert json.loads(task_status_res.json()["task_result"])["result"] == 60 # fail 6 == 60


python-3.x mocking celery pytest fastapi
1个回答
0
投票

问题是 celery_worker 装置根据 doc 在单独的线程中运行。

所以在补丁之后你应该像这样重新启动worker

monkeypatch.setattr(operations, "FACTOR", 20)
celery_worker.reload()
© www.soinside.com 2019 - 2024. All rights reserved.