为什么awaitqueue.get()会阻塞并导致我的异步消费者挂起?

问题描述 投票:0回答:1

我在使用异步队列时遇到了问题。具体来说,

await queue.get()
似乎阻止了我的应用程序的其余部分,导致它挂起。

我遇到了这里讨论的类似问题:为什么asyncio队列await get()会阻塞?阻塞,但我仍然很难理解如何解决我的案例中的问题。

在我的设置中,我尝试创建一个网络抓取 API(使用 FastAPI),允许用户提交由在消费者任务中运行的不同网络抓取器进行批处理和处理的 URL。然而,似乎一旦消费者等待队列中的新批次,应用程序就会停止。具体来说,我在应用程序启动前的 FastAPI 生命周期事件期间调用

start_consumers()
,但应用程序从未完全初始化,因为它被阻止。

有没有办法修改我的设置,以便消费者可以等待队列中的新项目,而不会阻塞应用程序的其余部分?还是这种做法注定会失败?

import asyncio
from asyncio.queues import Queue

from internal.services.webscraping.base import BaseScraper


class QueueController:

    _logger = create_logger(__name__)

    def __init__(
        self,
        scrapers: list[BaseScraper],
        batch_size: int = 50
    ):
        self.queue = Queue()
        self.batch_size = batch_size
        self.scrapers = scrapers
        self.consumers = []
        self.running = False

    async def put(self, urls: list[str]) -> None:
        """
        Add batches of URLs to the queue.
        """
        # Split the list of URLs into batches of size self.batch_size and add them to the queue.
        # If the queue is full, wait until there is space available.
        for i in range(0, len(urls), self.batch_size):
            batch = urls[i:i + self.batch_size]
            await self.queue.put(batch)

    async def get(self) -> list[str]:
        """
        Retrieve a batch from the queue.
        """
        # Get a batch of URLs from the queue.
        # If queue is empty, wait until an item is available.
        return await self.queue.get()

    async def consumer(self, scraper: BaseScraper) -> None:
        """
        Consumer coroutine that processes batches of URLs.
        """
        # Consumer tasks are designed to run in an infinite loop (as long as self.running is 
        # True) and fetch batches of URLs from the queue.
        while self.running:
            try:
                batch = await self.get()
                if batch:
                    records = await scraper.run(batch)
                    # TODO: Handle saving of result
            except Exception as e:
                # TODO: Add proper error handling
                ...
                raise e

    async def start_consumers(self) -> None:
        """
        Start the consumer tasks.

        Notes:
            https://docs.python.org/3/library/asyncio-task.html
            https://superfastpython.com/asyncio-task/
        """
        self.running = True
        self.consumers = [
            asyncio.create_task(self.consumer(scraper)) for scraper in self.scrapers
        ]
        await asyncio.gather(*self.consumers)

    async def stop_consumers(self) -> None:
        """
        Stop all consumer tasks gracefully.
        """
        self.running = False
        for task in self.consumers:
            task.cancel()
python asynchronous queue python-asyncio fastapi
1个回答
0
投票

昨晚我就明白了。它与上面的 QueueController 类无关,而是与我如何在 FastAPI 生命周期中启动它有关。以下是我正在做的事情:

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Start the consumer tasks.
    for queue in queues:
        await queue.start_consumers() # <----- Issue is here

    yield

    # Gracewfully stop the consumer tasks.
    for queue in queues:
        await queue.stop_consumers()

app = FastAPI(
    lifespan=lifespan,
)

我应该使用

asyncio.create_task
来提交协程以“在后台”运行。

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Start the consumer tasks.
    for queue in queues:
        asyncio.create_task(queue.start_consumers())

    yield

    # Gracewfully stop the consumer tasks.
    for queue in queues:
        await queue.stop_consumers()

app = FastAPI(
    lifespan=lifespan,
)
© www.soinside.com 2019 - 2024. All rights reserved.