如何将 SQS 侦听器设置为 FASTAPI 应用程序的一部分,在应用程序启动后侦听消息,而不会在没有 celery 的情况下阻塞端点?

问题描述 投票:0回答:1

我有一个 fastapi 应用程序,它的 api 端点很少。我需要设置一个 sqs 监听器,它不断监听消息而不阻塞端点。

我的应用程序中有 2 个端点 - /health 和 /search。 /health 用于健康检查,/search 用于搜索某些内容。我正在尝试使用 sqs 队列而不是 http 调用将我的应用程序移动到完全异步的系统。所以我将有一个专门用于搜索请求的队列。我仍然会保留端点作为主要用于测试目的的选项。所以我需要这个监听器不断地寻找队列中的消息。

async def sqs_listener():
  logger.info("Started Listener")
  testQueue = await create_queue("testQueue")
  print(testQueue)

  while(True):
      messages = receive_messages(testQueue, 5, 10)
      print(messages)

在 main.py 中,我尝试使用生命周期来调用此侦听器,但 FastAPI 应用程序未启动,而是侦听器在无限循环中运行,阻塞主事件循环,阻止 FastAPI 正确初始化和服务请求。

@asynccontextmanager
async def lifespan(app: FastAPI):
    listener_task = asyncio.create_task(sqs_listener())
    yield

如果可能的话,寻找不需要芹菜的解决方案。

python fastapi amazon-sqs sqslistener
1个回答
0
投票

A

receive_messages
方法由于
WaitTimeSeconds
长轮询而阻塞,因此应该在另一个线程中运行以使其非阻塞。

您可以使用

asyncio.to_thread
轻松实现此目的。

您的代码不完整,所以这是新的完整代码。

import asyncio
from contextlib import asynccontextmanager

import boto3
from fastapi import FastAPI

sqs = boto3.resource("sqs")
queue = sqs.create_queue(QueueName="qqq")

async def long_polling():
    while True:
        messages = await asyncio.to_thread(
            queue.receive_messages,  # <-- blocking task
            MaxNumberOfMessages=10,
            WaitTimeSeconds=20,
        )
        # process messages..


@asynccontextmanager
async def lifespan(_: FastAPI):
    task = asyncio.create_task(long_polling())

    yield

    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        pass


app = FastAPI(lifespan=lifespan)

查看如何取消任务:https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.