使用asyncio处理多个IO阻塞服务

问题描述 投票:0回答:2

这不一定是关于 Python 的问题,它可能是关于一般异步编程的问题。简而言之,我有未定义数量的对象,其中每个对象通过运行本质上充当无限循环的函数(通常包含 while True)来执行其操作。这些函数跨多个对象并行执行(目前它们不在它们之间共享数据,而是使用回调或将数据发送到第三个位置而不向后传递数据)。

例如,假设我有 Poller 类的对象(poller1、poller2、poller3 等直至 pollern),并且具有“读取”函数(包含无限循环)。同样,我有 Listener 类的对象(listener1、listener2、listener3,依此类推,直到listenern)。

这些函数使用 asyncio 和异步调用,一切都按我的预期运行。但是,我不确定这是否是完成此任务的正确方法,或者是否可以更有效地完成。

运行我的代码的函数如下所示:

async def loop(self):
    ...
    tasks = [
        asyncio.create_task(device.read())
        for poller in self.pollers
    ]

    tasks.extend(
        [
            asyncio.create_task(listener.start())
            for listener in self.listeners
        ]
    )

    for task in tasks:
        await task

我简化了代码,以关注问题的核心。我的问题主要是这是否是解决问题的适当方法,以及是否存在任何明显的缺点可能会使增强此功能变得更加困难。

python concurrency python-asyncio
2个回答
0
投票

您使用 asyncio 处理多个 IO 阻塞服务的方法从根本上来说是合理的,并且符合异步编程的最佳实践。使用 asyncio.create_task 为 Poller 和 Listener 对象安排读取和启动函数是合适的,因为它允许这些任务同时运行而不会阻塞事件循环。然而,代码中的等待任务的问题是,它会阻塞第一个未完成的任务,从而阻止其余任务按预期执行。相反,您应该使用await asyncio.gather(*tasks) 来同时运行所有任务,从而实现更好的可扩展性并更轻松地处理多个任务。这是代码的改进版本:

async def loop(self):
    tasks = [
        asyncio.create_task(poller.read())
        for poller in self.pollers
    ] + [
        asyncio.create_task(listener.start())
        for listener in self.listeners
    ]
    await asyncio.gather(*tasks)

这样,所有任务都得到正确管理,并且任何异常都会传播到主事件循环,确保鲁棒性和可维护性。


0
投票

我不确定您的具体担忧是什么,但我认为您所做的事情没有任何缺点。但是,通过使用生成器表达式而不是创建中间列表,可以使代码更加高效(并且更清晰?):

import itertools

...

async def loop(self):
    ...
    
    # Use generator expressions with itertools.chain
    
    readers = (
        asyncio.create_task(device.read())
        for poller in self.pollers)
    )
    
    listeners = (
        asyncio.create_task(listener.start())
        for listener in self.listeners
    )

    # Now iterate and create the tasks: 
    tasks = list(itertools.chain(readers, listeners))

    for task in tasks:
        await task

实际例子:

import asyncio
import itertools
import time

start_time = time.time()

async def reader(i):
    print(f'reader {i} started at time {time.time() - start_time}.')
    await asyncio.sleep(i)
    print(f'reader {i} ended at time {time.time() - start_time}.')

async def listener(i):
    print(f'listener {i} started at time {time.time() - start_time}.')
    await asyncio.sleep(i)
    print(f'listener {i} ended at time {time.time() - start_time}.')

async def loop():
    # Use generator expressions with itertools.chain

    readers = (
        asyncio.create_task(reader(i))
        for i in range(1, 4)
    )

    listeners = (
        asyncio.create_task(listener(i))
        for i in range(1, 4)
    )

    # Now iterate and create the tasks:
    tasks = list(itertools.chain(readers, listeners))

    for task in tasks:
        await task

asyncio.run(loop())

打印:

reader 1 started at time 0.000997304916381836.
reader 2 started at time 0.000997304916381836.
reader 3 started at time 0.001997232437133789.
listener 1 started at time 0.001997232437133789.
listener 2 started at time 0.001997232437133789.
listener 3 started at time 0.001997232437133789.
reader 1 ended at time 1.0143013000488281.
listener 1 ended at time 1.0184080600738525.
listener 2 ended at time 2.006826162338257.
reader 2 ended at time 2.0074567794799805.
listener 3 ended at time 3.0023696422576904.
reader 3 ended at time 3.0023696422576904.
© www.soinside.com 2019 - 2024. All rights reserved.