这不一定是关于 Python 的问题,它可能是关于一般异步编程的问题。简而言之,我有未定义数量的对象,其中每个对象通过运行本质上充当无限循环的函数(通常包含 while True)来执行其操作。这些函数跨多个对象并行执行(目前它们不在它们之间共享数据,而是使用回调或将数据发送到第三个位置而不向后传递数据)。
例如,假设我有 Poller 类的对象(poller1、poller2、poller3 等直至 pollern),并且具有“读取”函数(包含无限循环)。同样,我有 Listener 类的对象(listener1、listener2、listener3,依此类推,直到listenern)。
这些函数使用 asyncio 和异步调用,一切都按我的预期运行。但是,我不确定这是否是完成此任务的正确方法,或者是否可以更有效地完成。
运行我的代码的函数如下所示:
async def loop(self):
...
tasks = [
asyncio.create_task(device.read())
for poller in self.pollers
]
tasks.extend(
[
asyncio.create_task(listener.start())
for listener in self.listeners
]
)
for task in tasks:
await task
我简化了代码,以关注问题的核心。我的问题主要是这是否是解决问题的适当方法,以及是否存在任何明显的缺点可能会使增强此功能变得更加困难。
您使用 asyncio 处理多个 IO 阻塞服务的方法从根本上来说是合理的,并且符合异步编程的最佳实践。使用 asyncio.create_task 为 Poller 和 Listener 对象安排读取和启动函数是合适的,因为它允许这些任务同时运行而不会阻塞事件循环。然而,代码中的等待任务的问题是,它会阻塞第一个未完成的任务,从而阻止其余任务按预期执行。相反,您应该使用await asyncio.gather(*tasks) 来同时运行所有任务,从而实现更好的可扩展性并更轻松地处理多个任务。这是代码的改进版本:
async def loop(self):
tasks = [
asyncio.create_task(poller.read())
for poller in self.pollers
] + [
asyncio.create_task(listener.start())
for listener in self.listeners
]
await asyncio.gather(*tasks)
这样,所有任务都得到正确管理,并且任何异常都会传播到主事件循环,确保鲁棒性和可维护性。
我不确定您的具体担忧是什么,但我认为您所做的事情没有任何缺点。但是,通过使用生成器表达式而不是创建中间列表,可以使代码更加高效(并且更清晰?):
import itertools
...
async def loop(self):
...
# Use generator expressions with itertools.chain
readers = (
asyncio.create_task(device.read())
for poller in self.pollers)
)
listeners = (
asyncio.create_task(listener.start())
for listener in self.listeners
)
# Now iterate and create the tasks:
tasks = list(itertools.chain(readers, listeners))
for task in tasks:
await task
实际例子:
import asyncio
import itertools
import time
start_time = time.time()
async def reader(i):
print(f'reader {i} started at time {time.time() - start_time}.')
await asyncio.sleep(i)
print(f'reader {i} ended at time {time.time() - start_time}.')
async def listener(i):
print(f'listener {i} started at time {time.time() - start_time}.')
await asyncio.sleep(i)
print(f'listener {i} ended at time {time.time() - start_time}.')
async def loop():
# Use generator expressions with itertools.chain
readers = (
asyncio.create_task(reader(i))
for i in range(1, 4)
)
listeners = (
asyncio.create_task(listener(i))
for i in range(1, 4)
)
# Now iterate and create the tasks:
tasks = list(itertools.chain(readers, listeners))
for task in tasks:
await task
asyncio.run(loop())
打印:
reader 1 started at time 0.000997304916381836.
reader 2 started at time 0.000997304916381836.
reader 3 started at time 0.001997232437133789.
listener 1 started at time 0.001997232437133789.
listener 2 started at time 0.001997232437133789.
listener 3 started at time 0.001997232437133789.
reader 1 ended at time 1.0143013000488281.
listener 1 ended at time 1.0184080600738525.
listener 2 ended at time 2.006826162338257.
reader 2 ended at time 2.0074567794799805.
listener 3 ended at time 3.0023696422576904.
reader 3 ended at time 3.0023696422576904.