在 FastAPI 中我有一个端点:
from aiocache import Cache, cached
@router.get("/id/{user_id}/permissions")
@cached(ttl=30, cache=Cache.MEMORY)
async def get_permissions(user_id: UUID4) -> list[Permissions]:
然后在我的单元测试中,我想清除缓存。
我尝试使用名称空间、密钥重新创建缓存。目前我有:
from my_module import get_permissions
my_cache = get_permissions.cache
await my_cache.clear()
我尝试了所有能想到的组合。但我无法通过此异步测试来清除缓存。有任何想法吗?我究竟做错了什么?难道是在不同的进程中存在?
嗯,我做了和你一样的事情,因为你的问题是在 async context 中,也许是你的测试设置?这是使用 pytest 和 pytest_asyncio 插件工作的示例。
# conftest.py
@pytest.fixture(scope="session")
def event_loop():
policy = asyncio.get_event_loop_policy()
loop = policy.new_event_loop()
yield loop
loop.close()
# test_something.py
@pytest.mark.asyncio
async def test_something():
from my_module import get_permissions
my_cache = get_permissions.cache
await my_cache.clear()
# rest of the test.