考虑到您有以下代码:
import asyncio
import functools
if __name__ == '__main__':
asyncio.run(main())
async def main():
background_t = set()
for i in range(6):
task = asyncio.create_task(foo1(i))
background_t.add(task)
async def foo1(i):
loop = asyncio.get_running_loop()
try:
await asyncio.wait_for(loop.run_in_executor(None, functools.partial(foo2(), i),60)
except asyncio.TimeoutError:
print("timed out")
def foo2(num):
# some blocking code
我的问题是,当每个任务都是同步操作时,这种模式是否会导致代码在六个不同的任务中同时运行。
快到了:
阻塞函数被正确调度为在默认执行器中的不同线程中运行,其中一些将开始运行,并并发执行。
但是,这些任务的运行数量与执行器中的工作线程数量相同——并且每个任务在运行时都会占用一个完整的工作线程。 (因此,如果任务是连续循环、没有返回或花费时间超过 60 秒,它只会阻塞工作线程)。
操作系统和 Python 运行时将在所有运行此类任务之间自动切换(尽管由于 Python 的限制高达 3.12,它们一次只能使用一个 CPU 核心,并且使用 3.13 的特殊“自由线程”版本,您可以可以免费使用各种内核中的实际并行性)。问题是,在所有工作人员都忙碌之后安排的任何任务都不会启动,直到有空闲工作人员为止 - 它只会排队等待执行。
如果阻塞原因是 I/O 代码,只需创建一个显式执行器,其中至少包含与要并行运行的任务一样多的工作线程,即可按您计划的方式工作。
如果要让工作线程保持运行,更好的策略是为每个任务使用显式线程,并用
asyncio.to_thread
代替。
请记住,其他线程中的同步函数无法自动取消:即使可等待包装器调用(run_in_executor
或to_thread
)被取消,例如通过超时,实际的同步代码将继续运行并使用系统资源,直到它自然完成。
唯一的解决方法是创建一个可以在同步代码内检查的状态(例如,实例属性或发布到队列的值(来自 queue.Queue
的面向线程的队列,而不是 asyncio.Queue
)