我在使用异步队列时遇到了问题。具体来说,
await queue.get()
似乎阻止了我的应用程序的其余部分,导致它挂起。
我遇到了这里讨论的类似问题:为什么asyncio队列await get()会阻塞?阻塞,但我仍然很难理解如何解决我的案例中的问题。
在我的设置中,我尝试创建一个网络抓取 API(使用 FastAPI),允许用户提交由在消费者任务中运行的不同网络抓取器进行批处理和处理的 URL。然而,似乎一旦消费者等待队列中的新批次,应用程序就会停止。具体来说,我在应用程序启动前的 FastAPI 生命周期事件期间调用
start_consumers()
,但应用程序从未完全初始化,因为它被阻止。
有没有办法修改我的设置,以便消费者可以等待队列中的新项目,而不会阻塞应用程序的其余部分?还是这种做法注定会失败?
import asyncio
from asyncio.queues import Queue
from internal.services.webscraping.base import BaseScraper
class QueueController:
_logger = create_logger(__name__)
def __init__(
self,
scrapers: list[BaseScraper],
batch_size: int = 50
):
self.queue = Queue()
self.batch_size = batch_size
self.scrapers = scrapers
self.consumers = []
self.running = False
async def put(self, urls: list[str]) -> None:
"""
Add batches of URLs to the queue.
"""
# Split the list of URLs into batches of size self.batch_size and add them to the queue.
# If the queue is full, wait until there is space available.
for i in range(0, len(urls), self.batch_size):
batch = urls[i:i + self.batch_size]
await self.queue.put(batch)
async def get(self) -> list[str]:
"""
Retrieve a batch from the queue.
"""
# Get a batch of URLs from the queue.
# If queue is empty, wait until an item is available.
return await self.queue.get()
async def consumer(self, scraper: BaseScraper) -> None:
"""
Consumer coroutine that processes batches of URLs.
"""
# Consumer tasks are designed to run in an infinite loop (as long as self.running is
# True) and fetch batches of URLs from the queue.
while self.running:
try:
batch = await self.get()
if batch:
records = await scraper.run(batch)
# TODO: Handle saving of result
except Exception as e:
# TODO: Add proper error handling
...
raise e
async def start_consumers(self) -> None:
"""
Start the consumer tasks.
Notes:
https://docs.python.org/3/library/asyncio-task.html
https://superfastpython.com/asyncio-task/
"""
self.running = True
self.consumers = [
asyncio.create_task(self.consumer(scraper)) for scraper in self.scrapers
]
await asyncio.gather(*self.consumers)
async def stop_consumers(self) -> None:
"""
Stop all consumer tasks gracefully.
"""
self.running = False
for task in self.consumers:
task.cancel()
昨晚我就明白了。它与上面的 QueueController 类无关,而是与我如何在 FastAPI 生命周期中启动它有关。以下是我正在做的事情:
@asynccontextmanager
async def lifespan(app: FastAPI):
# Start the consumer tasks.
for queue in queues:
await queue.start_consumers() # <----- Issue is here
yield
# Gracewfully stop the consumer tasks.
for queue in queues:
await queue.stop_consumers()
app = FastAPI(
lifespan=lifespan,
)
我应该使用
asyncio.create_task
来提交协程以“在后台”运行。
@asynccontextmanager
async def lifespan(app: FastAPI):
# Start the consumer tasks.
for queue in queues:
asyncio.create_task(queue.start_consumers())
yield
# Gracewfully stop the consumer tasks.
for queue in queues:
await queue.stop_consumers()
app = FastAPI(
lifespan=lifespan,
)