我正在努力测试包含 celery 任务的 fastapi 端点。我想嘲笑芹菜任务。任务如下
send_email.delay(
user.email,
"Subject goes here",
html_content,
)
这里是实际的模拟功能
@pytest.fixture
def mocked_send_email(mocker):
mocker.patch('app.views.user.send_email.delay', return_value=None)
def test_create_user(client, mocked_send_email):
payload = {}
response = client.post(
f"{base_url}/user",
json=payload,
headers=admin_headers
)
mocked_send_email.assert_called_once_with(
"[email protected]",
"Subject goes here",
mock.ANY
)
这是我处理芹菜的部分
conftest.py
@pytest.fixture(scope="session")
def celery_config():
print('running celery config')
return {"broker_url": REDIS, "result_backend": REDIS}
@pytest.fixture(scope='session', autouse=True)
def configure_celery(celery_config):
from app.celery import celery
celery.conf.update(celery_config)
yield
celery.conf.update({
'broker_url': 'memory://',
'result_backend': 'rpc://'
})
现在当我运行测试时,出现以下错误
usr/local/lib/python3.12/site-packages/kombu/connection.py:476: OperationalError
test_1 | ------------------------------ Captured log call -------------------------------
test_1 | WARNING kombu.connection:connection.py:669 _info - _info - No hostname was supplied. Reverting to default 'localhost'
我的设置是 dockerized 的,当我不运行 pytest 时,运行不会出现错误。我真的很困惑,考虑到我的设置看起来不错并且在我不运行测试时可以工作,我不确定是什么导致了这个错误。任何帮助将非常感激
更新。打印配置 celery 夹具的语句
Before update:redis://redis-cache:6379/1 redis://redis-cache:6379/1
After update:redis://redis-cache:6379/5 redis://redis-cache:6379/5
After reset: memory:// rpc://
我遇到的错误似乎源于我对
@shared_task
装饰器的使用。根据 Django 的 Celery 文档here,使用 @shared_task
需要额外的配置。由于我使用的是 FastAPI,我不确定如何将此配置合并到我的项目中。
为了解决此问题,我已从使用
@shared_task
切换为直接使用带有 @celery_app.task
的 Celery 应用程序实例。我不太喜欢这种方法。但目前它正在为我工作。我愿意接受任何能够更好地与 Django 的 Celery 配置保持一致的解决方案。
如果有人对在 FastAPI 项目中正确配置
@shared_task
或更优雅的解决方案有见解,我将不胜感激您的意见。
更新 感谢这篇
博客文章,我终于找到了设置 fastapi 应用程序以正确使用 celery 的
shared_task
的最合适方法
首先您需要按如下方式配置您的
celery.py
。请注意,我们没有创建新的 Celery 实例,而是使用 current_app
,以便 shared_task
将按预期工作
from celery import Celery, current_app as current_celery_app
def make_celery(app_name=__name__):
# Use the current Celery app instance if it exists, otherwise create a new one
celery = current_celery_app if current_celery_app else Celery(app_name)
celery.conf.update(
broker='redis://localhost:6379/1', # Specify the broker URL
backend='redis://localhost:6379/1', # Specify the result backend
)
return celery
celery_app = make_celery()
然后您需要按如下方式设置您的
main.py
。请注意,我们使用工厂模式方法来确保使用 Celery 正确设置 Fastapi 应用程序
def create_app() -> FastAPI:
@asynccontextmanager
async def app_lifespan(app: FastAPI):
app.celery = make_celery()
yield
app = FastAPI(lifespan=app_lifespan)
@app.get("/")
async def root():
return {"message": "Welcome to the FastAPI application"}
return app
app = create_app()
通过遵循此设置,您可以在 FastAPI 项目中有效地使用 @shared_task,确保正确注册和管理 Celery 任务,类似于在 Django 项目中的工作方式。令人惊讶的是,Celery 团队没有提供有关此集成的明确文档,但这种方法应该可以帮助您顺利启动和运行。