扩展应用程序时,concurrent.futures.ThreadPoolExecutor 最大工作线程数的工作

问题描述 投票:0回答:1

我是 Python 编程新手。我的大部分代码都使用

asyncio
,因为我正在对数据库进行 IO 调用,尽管在某些情况下我使用非异步方法,这些方法像很少的 Pandas 框架调用数据库一样长时间运行,因此避免阻塞调用限制了可扩展性,我使用
concurrent.futures.ThreadPoolExecutor
来执行阻塞方法,如下所示:

with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
      values = executor.map(func, data)

上面的

func
提供的是最大长度为2的数据收集,基本上不需要超过2个线程,但是当多个用户进来并且应用程序需要扩展时,这时候,什么才是理想的
max_workers
值:

  1. 是否每个用户都需要,是2个
  2. 是否应该是最大可能值,如链接中所述 - https://docs.python.org/3/library/concurrent.futures.html

3.8 版本更改:max_workers 的默认值更改为 min(32, os.cpu_count() + 4)。此默认值为 I/O 绑定任务保留至少 5 个工作线程。它最多使用 32 个 CPU 核心来执行释放 GIL 的 CPU 密集型任务。而且它避免了在多核机器上隐式使用非常大的资源。

  1. 我根本不提它吗?它可以根据要求生成

重点仍然是,如果 10 个用户开始执行相同的操作,他们最终会使用相同的

ThreadPoolExecutor
(共享)还是他们最终会获得不同的执行器,因为这不是共享对象。我想确保扩大应用程序时不会因设计不正确而受到影响

python python-3.x python-asyncio threadpoolexecutor
1个回答
4
投票

如果您从异步代码中调用 ThreadPoolExecutor,则应该使用

asyncio
run_in_executor 函数,否则它将阻塞主事件循环。

如果额外的工作负载受 CPU 限制,那么您还应该使用 ProcessPoolExecutor 来代替。

Python 文档中的示例:

import asyncio
import concurrent.futures

def cpu_bound():
    # CPU-bound operations will block the event loop:
    # in general it is preferable to run them in a
    # process pool.
    return sum(i * i for i in range(10 ** 7))

async def main():
    loop = asyncio.get_running_loop()

    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_bound)
        print('custom process pool', result)

asyncio.run(main())

当涉及到

max_workers
时,默认值通常就可以了:

  • ThreadPoolExecutor
    :

    • os.cpu_count() * 5
      来自 Python 3.5;
    • min(32, os.cpu_count() + 4)
      来自 Python 3.8;
    • min(32, (os.process_cpu_count() or 1) + 4)
      来自 Python 3.13;
  • ProcessPoolExecutor
    os.cpu_count() or 1
    (来自 Python 3.13 的
    os.process_cpu_count() or 1
    )。

这取决于您的工作负载(CPU 与 I/O 限制),默认值较高“假设 ThreadPoolExecutor 通常用于重叠 I/O 而不是 CPU 工作”,但对于 CPU 限制任务来说没有意义将其设置为大于可用 CPU 的数字,因为它实际上可能会由于上下文切换等而降低性能。

两个执行器都使用队列在可用线程/进程上排队和调度任务。

更新:2021 年 3 月 25 日星期四 15:17:51 UTC 2021

asyncio
事件循环是单线程的,因此当您同时安排其他协程时您会看到这个问题。正如您所看到的,
none-blocking
任务被
blocking executor
阻塞了10秒:

$ python test.py
START none-blocking executor: (scheduled: 5.0s)
START none-blocking: (scheduled: 1.0s)
START blocking executor: (scheduled: 10.0s)
END none-blocking executor: (elapsed: 5.0s)
END blocking executor: (elapsed: 10.0s)
END none-blocking: (elapsed: 10.0s)

如果您运行几次并且

blocking executor
将首先启动,则在
none-blocking
结束之前,
blocking executor
任务甚至不会启动:

$ python test.py
START none-blocking executor: (scheduled: 5.0s)
START blocking executor: (scheduled: 10.0s)
END none-blocking executor: (elapsed: 5.0s)
END blocking executor: (elapsed: 10.0s)
START none-blocking: (scheduled: 1.0s)
END none-blocking: (elapsed: 1.0s)

当您注释掉

blocking executor
时,您可以看到所有调用现在都是异步的:

$ python test.py
START none-blocking executor: (scheduled: 5.0s)
START none-blocking: (scheduled: 1.0s)
END none-blocking: (elapsed: 1.0s)
END none-blocking executor: (elapsed: 5.0s)

关键的一点是,一旦开始编写异步代码,就不能将其与同步调用混合在一起。

测试.py:

import asyncio
import time

from concurrent.futures import ThreadPoolExecutor


def blocking(msg, t):
    t1 = time.perf_counter()

    print(f"START {msg}: (scheduled: {t}s)")
    time.sleep(t)
    print(f"END {msg}: (elapsed: {time.perf_counter() - t1:.1f}s)")


async def task1(msg, t):
    t1 = time.perf_counter()

    print(f"START {msg}: (scheduled: {t}s)")
    await asyncio.sleep(t)
    print(f"END {msg}: (elapsed: {time.perf_counter() - t1:.1f}s)")


async def task2(msg, t):
    with ThreadPoolExecutor() as executor:
        future = executor.submit(blocking, msg, t)
        future.result()


async def main():
    loop = asyncio.get_running_loop()

    aws = [
        task1("none-blocking", 1.0),
        loop.run_in_executor(None, blocking, "none-blocking executor", 5.0),
        task2("blocking executor", 10.0),
    ]

    for coro in asyncio.as_completed(aws):
        await coro


if __name__ == "__main__":
    asyncio.run(main())
© www.soinside.com 2019 - 2024. All rights reserved.