我正在尝试在 docker 上使用 Python 3.9.5、Celery 5.2.6、pytest 7.1.0 和 FastAPI 运行一些简单的集成测试。
项目结构:
📦
┣ 📂app
┣ ┣ 📂api
┣ ┣ ┣ 📂routes
┣ ┣ ┣ ┗ 📜celery.py
┣ ┗ 📂services
┣ ┗ 📜operations.py
┣ ┣ 📂celery
┣ ┃ ┣ 📜tasks.py
┣ ┃ ┣ 📜utils.py
┣ ┃ ┗ 📜worker.py
┣ 📂tests
┣ 📜conftest.py
┗ 📜test_celery.py
芹菜容器运行:
command: celery -A app.celery.worker:celery_app worker --loglevel INFO --logfile=./celery.log --hostname=worker_1_dev@%h
,并且按预期工作。
Web 服务器容器包含:
command: uvicorn app.api.server:app
两者都具有完整项目目录的绑定安装。
应该如何定义 celery 固定装置,以便我可以对
operations.py
或任务本身进行 Monkeypatch?
现在,当我的测试运行时,实际的工作容器会拾取实际任务并运行它们,所以我猜问题出在
在赛程中。设置始终急切不会改变行为,也不建议这样做。
工作线程和应用程序实例设备按照以下方式返回对象:
celery_worker: gen44@XXX
celery_app: <Celery celery.tests at XXX>
这似乎是正确的,它们不是实际的工作容器或
app.celery.worker:celery_app
实例。
我尝试过弄乱文档中的其他装置、模拟方法等,但我在这方面花费了太多时间,并且只是进行了测试
似乎不起作用。就像我说的,实际容器按预期执行任务。
应用程序/api/routes/celery.py
class TaskStatus(CoreModel):
task_id: str
task_status: Optional[str]
task_result: Optional[Any]
@router.post(
"/operations/add-with-factor",
name="celery:add-times-factor",
)
def add_times_factor(
x: int = Query(..., gt=0),
y: int = Query(..., gt=0),
):
task = celery_app.send_task(
"add_times_factor",
args=[x, y],
)
return {"task_id": task.id}
@router.get(
"/tasks/{task_id}/",
name="celery:get-task-status",
response_model=TaskStatus,
)
def get_status(task_id):
task_result = AsyncResult(task_id)
return TaskStatus(
task_id=task_id,
task_status=task_result.status,
task_result=task_result.result,
)
测试/conftest.py
@pytest.fixture(scope="session")
def celery_enable_logging():
return True
@pytest.fixture(scope="session")
def celery_config():
return {
"broker_url": "memory://",
"result_backend": "rpc://",
"task_always_eager": True, # to be removed
}
@pytest.fixture(scope="session")
def celery_worker_parameters():
return {
"perform_ping_check": False,
}
@pytest.fixture(scope="session")
def celery_includes():
return ["app.celery.tasks"]
...
应用程序/celery/worker.py
import os
from celery import Celery
celery_app = Celery("app.celery.worker")
celery_app.conf.task_serializer = "json"
celery_app.conf.result_serializer = "json"
celery_app.conf.accept_content = ["application/json", "application/x-python-serialize"]
celery_app.conf.broker_url = os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379")
celery_app.conf.result_backend = os.environ.get("CELERY_RESULT_BACKEND", "redis://localhost:6379")
celery_app.autodiscover_tasks(["app.celery"])
应用程序/celery/tasks.py
from app.services.operations import OperationService
def async_to_sync(func):
@functools.wraps(func)
def wrapped(*args, **kwargs):
return asyncio.run(func(*args, **kwargs))
return wrapped
@shared_task(name="add_times_factor")
@async_to_sync
async def add_times_factor_task(x, y):
operation_service = OperationService()
result = await operation_service.add_times_factor(x, y)
return {"result": result}
应用程序/服务/操作.py
FACTOR = 2
class OperationService:
async def add_times_factor(self, x, y):
await something
return (x + y) * FACTOR
测试/test_celery.py
import pytest
import app.services.operations as operations
pytestmark = pytest.mark.asyncio
class TestCelery:
async def test_add_times_factor(self,
celery_worker,
celery_app: Celery,
monkeypatch,
):
monkeypatch.setattr(operations, "FACTOR", 20) # ignored right now
celery_app.set_current()
res = await superuser_client.post(app.url_path_for("celery:add-times-factor"), params={"x": 1, "y": 2})
task_succeeded = False
while not task_succeeded:
task_status_res = await superuser_client.get(app.url_path_for("celery:get-task-status", task_id=task.id))
assert task_status_res.status_code == HTTP_200_OK
task_succeeded = task_status_res.json()["task_status"] == "SUCCESS"
await asyncio.sleep(0.5)
assert json.loads(task_status_res.json()["task_result"])["result"] == 60 # fail 6 == 60
问题是 celery_worker 装置根据 doc 在单独的线程中运行。
所以在补丁之后你应该像这样重新启动worker
monkeypatch.setattr(operations, "FACTOR", 20)
celery_worker.reload()