Celery shared_task 无法与 fastapi 中的 pytest 一起使用

问题描述 投票:0回答:1

我正在努力测试包含 celery 任务的 fastapi 端点。我想嘲笑芹菜任务。任务如下

send_email.delay(
        user.email,
        "Subject goes here",
        html_content,
    )

这里是实际的模拟功能

@pytest.fixture
def mocked_send_email(mocker):
    mocker.patch('app.views.user.send_email.delay', return_value=None)
def test_create_user(client, mocked_send_email):

    payload = {}

    response = client.post(
        f"{base_url}/user",
        json=payload,
        headers=admin_headers
    )

    mocked_send_email.assert_called_once_with(
        "[email protected]",
        "Subject goes here",
        mock.ANY
    )

这是我处理芹菜的部分

conftest.py

@pytest.fixture(scope="session")
def celery_config():
    print('running celery config')
    return {"broker_url": REDIS, "result_backend": REDIS}


@pytest.fixture(scope='session', autouse=True)
def configure_celery(celery_config):
    from app.celery import celery
    celery.conf.update(celery_config)
    yield
    celery.conf.update({
        'broker_url': 'memory://',
        'result_backend': 'rpc://'
    })

现在当我运行测试时,出现以下错误

usr/local/lib/python3.12/site-packages/kombu/connection.py:476: OperationalError
test_1           | ------------------------------ Captured log call -------------------------------
test_1           | WARNING  kombu.connection:connection.py:669 _info - _info - No hostname was supplied. Reverting to default 'localhost'

我的设置是 dockerized 的,当我不运行 pytest 时,运行不会出现错误。我真的很困惑,考虑到我的设置看起来不错并且在我不运行测试时可以工作,我不确定是什么导致了这个错误。任何帮助将非常感激

更新。打印配置 celery 夹具的语句

Before update:redis://redis-cache:6379/1 redis://redis-cache:6379/1
After update:redis://redis-cache:6379/5 redis://redis-cache:6379/5
After reset: memory:// rpc://
python pytest celery
1个回答
0
投票

我遇到的错误似乎源于我对

@shared_task
装饰器的使用。根据 Django 的 Celery 文档here,使用
@shared_task
需要额外的配置。由于我使用的是 FastAPI,我不确定如何将此配置合并到我的项目中。

为了解决此问题,我已从使用

@shared_task
切换为直接使用带有
@celery_app.task
的 Celery 应用程序实例。我不太喜欢这种方法。但目前它正在为我工作。我愿意接受任何能够更好地与 Django 的 Celery 配置保持一致的解决方案。

如果有人对在 FastAPI 项目中正确配置

@shared_task
或更优雅的解决方案有见解,我将不胜感激您的意见。

更新 感谢这篇

博客文章
,我终于找到了设置 fastapi 应用程序以正确使用 celery 的 shared_task

的最合适方法

首先您需要按如下方式配置您的

celery.py
。请注意,我们没有创建新的 Celery 实例,而是使用
current_app
,以便
shared_task
将按预期工作

from celery import Celery, current_app as current_celery_app
def make_celery(app_name=__name__):
    # Use the current Celery app instance if it exists, otherwise create a new one
    celery = current_celery_app if current_celery_app else Celery(app_name)

    celery.conf.update(
        broker='redis://localhost:6379/1',  # Specify the broker URL
        backend='redis://localhost:6379/1',  # Specify the result backend
    )

    return celery


celery_app = make_celery()

然后您需要按如下方式设置您的

main.py
。请注意,我们使用工厂模式方法来确保使用 Celery 正确设置 Fastapi 应用程序

def create_app() -> FastAPI:
    @asynccontextmanager
    async def app_lifespan(app: FastAPI):
        app.celery = make_celery()
        yield

    app = FastAPI(lifespan=app_lifespan)

   

    @app.get("/")
    async def root():
        return {"message": "Welcome to the FastAPI application"}

    return app


app = create_app()

通过遵循此设置,您可以在 FastAPI 项目中有效地使用 @shared_task,确保正确注册和管理 Celery 任务,类似于在 Django 项目中的工作方式。令人惊讶的是,Celery 团队没有提供有关此集成的明确文档,但这种方法应该可以帮助您顺利启动和运行。

© www.soinside.com 2019 - 2024. All rights reserved.