我是 Python 编程新手。我的大部分代码都使用
asyncio
,因为我正在对数据库进行 IO 调用,尽管在某些情况下我使用非异步方法,这些方法像很少的 Pandas 框架调用数据库一样长时间运行,因此避免阻塞调用限制了可扩展性,我使用 concurrent.futures.ThreadPoolExecutor
来执行阻塞方法,如下所示:
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
values = executor.map(func, data)
上面的
func
提供的是最大长度为2的数据收集,基本上不需要超过2个线程,但是当多个用户进来并且应用程序需要扩展时,这时候,什么才是理想的max_workers
值:
3.8 版本更改:max_workers 的默认值更改为 min(32, os.cpu_count() + 4)。此默认值为 I/O 绑定任务保留至少 5 个工作线程。它最多使用 32 个 CPU 核心来执行释放 GIL 的 CPU 密集型任务。而且它避免了在多核机器上隐式使用非常大的资源。
重点仍然是,如果 10 个用户开始执行相同的操作,他们最终会使用相同的
ThreadPoolExecutor
(共享)还是他们最终会获得不同的执行器,因为这不是共享对象。我想确保扩大应用程序时不会因设计不正确而受到影响
如果您从异步代码中调用 ThreadPoolExecutor,则应该使用
asyncio
run_in_executor 函数,否则它将阻塞主事件循环。
如果额外的工作负载受 CPU 限制,那么您还应该使用 ProcessPoolExecutor 来代替。
Python 文档中的示例:
import asyncio
import concurrent.futures
def cpu_bound():
# CPU-bound operations will block the event loop:
# in general it is preferable to run them in a
# process pool.
return sum(i * i for i in range(10 ** 7))
async def main():
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_bound)
print('custom process pool', result)
asyncio.run(main())
当涉及到
max_workers
时,默认值通常就可以了:
ThreadPoolExecutor
:
os.cpu_count() * 5
来自 Python 3.5;min(32, os.cpu_count() + 4)
来自 Python 3.8;min(32, (os.process_cpu_count() or 1) + 4)
来自 Python 3.13;ProcessPoolExecutor
:os.cpu_count() or 1
(来自 Python 3.13 的os.process_cpu_count() or 1
)。
这取决于您的工作负载(CPU 与 I/O 限制),默认值较高“假设 ThreadPoolExecutor 通常用于重叠 I/O 而不是 CPU 工作”,但对于 CPU 限制任务来说没有意义将其设置为大于可用 CPU 的数字,因为它实际上可能会由于上下文切换等而降低性能。
两个执行器都使用队列在可用线程/进程上排队和调度任务。
更新:2021 年 3 月 25 日星期四 15:17:51 UTC 2021
asyncio
事件循环是单线程的,因此当您同时安排其他协程时您会看到这个问题。正如您所看到的,none-blocking
任务被blocking executor
阻塞了10秒:
$ python test.py
START none-blocking executor: (scheduled: 5.0s)
START none-blocking: (scheduled: 1.0s)
START blocking executor: (scheduled: 10.0s)
END none-blocking executor: (elapsed: 5.0s)
END blocking executor: (elapsed: 10.0s)
END none-blocking: (elapsed: 10.0s)
如果您运行几次并且
blocking executor
将首先启动,则在 none-blocking
结束之前,blocking executor
任务甚至不会启动:
$ python test.py
START none-blocking executor: (scheduled: 5.0s)
START blocking executor: (scheduled: 10.0s)
END none-blocking executor: (elapsed: 5.0s)
END blocking executor: (elapsed: 10.0s)
START none-blocking: (scheduled: 1.0s)
END none-blocking: (elapsed: 1.0s)
当您注释掉
blocking executor
时,您可以看到所有调用现在都是异步的:
$ python test.py
START none-blocking executor: (scheduled: 5.0s)
START none-blocking: (scheduled: 1.0s)
END none-blocking: (elapsed: 1.0s)
END none-blocking executor: (elapsed: 5.0s)
关键的一点是,一旦开始编写异步代码,就不能将其与同步调用混合在一起。
测试.py:
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def blocking(msg, t):
t1 = time.perf_counter()
print(f"START {msg}: (scheduled: {t}s)")
time.sleep(t)
print(f"END {msg}: (elapsed: {time.perf_counter() - t1:.1f}s)")
async def task1(msg, t):
t1 = time.perf_counter()
print(f"START {msg}: (scheduled: {t}s)")
await asyncio.sleep(t)
print(f"END {msg}: (elapsed: {time.perf_counter() - t1:.1f}s)")
async def task2(msg, t):
with ThreadPoolExecutor() as executor:
future = executor.submit(blocking, msg, t)
future.result()
async def main():
loop = asyncio.get_running_loop()
aws = [
task1("none-blocking", 1.0),
loop.run_in_executor(None, blocking, "none-blocking executor", 5.0),
task2("blocking executor", 10.0),
]
for coro in asyncio.as_completed(aws):
await coro
if __name__ == "__main__":
asyncio.run(main())