使用asyncio发起多个后台任务,此时各个任务是同步的

问题描述 投票:0回答:1

考虑到您有以下代码:

import asyncio
import functools

if __name__ == '__main__':
    asyncio.run(main())


async def main():
    background_t = set()
    for i in range(6):
        task = asyncio.create_task(foo1(i))
        background_t.add(task)

async def foo1(i):
  loop = asyncio.get_running_loop()
  try:
      await asyncio.wait_for(loop.run_in_executor(None, functools.partial(foo2(), i),60)
  except asyncio.TimeoutError:
      print("timed out")

def foo2(num):
  # some blocking code

我的问题是,当每个任务都是同步操作时,这种模式是否会导致代码在六个不同的任务中同时运行。

python-3.x python-asyncio
1个回答
0
投票

快到了:

阻塞函数被正确调度为在默认执行器中的不同线程中运行,其中一些将开始运行,并并发执行。

但是,这些任务的运行数量与执行器中的工作线程数量相同——并且每个任务在运行时都会占用一个完整的工作线程。 (因此,如果任务是连续循环、没有返回或花费时间超过 60 秒,它只会阻塞工作线程)。

操作系统和 Python 运行时将在所有运行此类任务之间自动切换(尽管由于 Python 的限制高达 3.12,它们一次只能使用一个 CPU 核心,并且使用 3.13 的特殊“自由线程”版本,您可以可以免费使用各种内核中的实际并行性)。问题是,在所有工作人员都忙碌之后安排的任何任务都不会启动,直到有空闲工作人员为止 - 它只会排队等待执行。

如果阻塞原因是 I/O 代码,只需创建一个显式执行器,其中至少包含与要并行运行的任务一样多的工作线程,即可按您计划的方式工作。

如果要让工作线程保持运行,更好的策略是为每个任务使用显式线程,并用

asyncio.to_thread
代替。 请记住,其他线程中的同步函数无法自动取消:即使可等待包装器调用(
run_in_executor
to_thread
)被取消,例如通过超时,实际的同步代码将继续运行并使用系统资源,直到它自然完成。 唯一的解决方法是创建一个可以在同步代码内检查的状态(例如,实例属性或发布到队列的值(来自
queue.Queue
的面向线程的队列,而不是
asyncio.Queue

© www.soinside.com 2019 - 2024. All rights reserved.