我有一个 fastapi 应用程序,它的 api 端点很少。我需要设置一个 sqs 监听器,它不断监听消息而不阻塞端点。
我的应用程序中有 2 个端点 - /health 和 /search。 /health 用于健康检查,/search 用于搜索某些内容。我正在尝试使用 sqs 队列而不是 http 调用将我的应用程序移动到完全异步的系统。所以我将有一个专门用于搜索请求的队列。我仍然会保留端点作为主要用于测试目的的选项。所以我需要这个监听器不断地寻找队列中的消息。
async def sqs_listener():
logger.info("Started Listener")
testQueue = await create_queue("testQueue")
print(testQueue)
while(True):
messages = receive_messages(testQueue, 5, 10)
print(messages)
在 main.py 中,我尝试使用生命周期来调用此侦听器,但 FastAPI 应用程序未启动,而是侦听器在无限循环中运行,阻塞主事件循环,阻止 FastAPI 正确初始化和服务请求。
@asynccontextmanager
async def lifespan(app: FastAPI):
listener_task = asyncio.create_task(sqs_listener())
yield
如果可能的话,寻找不需要芹菜的解决方案。
A
receive_messages
方法由于 WaitTimeSeconds
的长轮询而阻塞,因此应该在另一个线程中运行以使其非阻塞。
您可以使用
asyncio.to_thread
轻松实现此目的。
您的代码不完整,所以这是新的完整代码。
import asyncio
from contextlib import asynccontextmanager
import boto3
from fastapi import FastAPI
sqs = boto3.resource("sqs")
queue = sqs.create_queue(QueueName="qqq")
async def long_polling():
while True:
messages = await asyncio.to_thread(
queue.receive_messages, # <-- blocking task
MaxNumberOfMessages=10,
WaitTimeSeconds=20,
)
# process messages..
@asynccontextmanager
async def lifespan(_: FastAPI):
task = asyncio.create_task(long_polling())
yield
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
app = FastAPI(lifespan=lifespan)
查看如何取消任务:https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel