如何在 Python 中有效地结合异步 I/O 和 CPU 密集型任务?

问题描述 投票:0回答:1

我正在开发一个 Python 项目,我需要处理异步 I/O 任务(如读取文件或发出 API 请求)和 CPU 密集型任务(如数据处理或计算)。我正在努力寻找有效结合这两者的最佳方法。

问题:

  • 我需要发出异步API请求并使用CPU密集型任务处理结果。
  • 挑战在于,在等待 I/O(API 响应)时,我想并行处理数据以避免性能瓶颈。
  • 如何平衡异步 I/OCPU 密集型任务,而不阻塞事件循环或低效运行任务?

例如,在这段代码中:

import asyncio
from time import sleep

async def fetch_data(api_url):
    # Simulate an API request
    await asyncio.sleep(2)
    return f"Data from {api_url}"

def process_data(data):
    # Simulate CPU-bound data processing
    sleep(3)
    return data.upper()

async def main():
    api_urls = ['api/1', 'api/2', 'api/3']
    
    # Fetch and process data
    tasks = []
    for url in api_urls:
        data = await fetch_data(url)
        result = process_data(data)  # Blocking the event loop here
        tasks.append(result)

    print(tasks)

asyncio.run(main())

此示例在处理数据时会阻塞事件循环,从而导致效率低下。我知道我可以将

concurrent.futures
用于 CPU 绑定部分,但我不确定如何将其与 asyncio 有效地混合。

  1. 如何在不阻塞事件循环的情况下组合异步 I/O(如
    await fetch_data()
    )和CPU 密集型任务(如
    process_data()
    )?
  2. 是否有一种有效的方法来并行运行CPU密集型任务,同时仍然处理异步I/O而不减慢整个过程?

我期望:

  • 进行多个异步 API 调用。
  • 在不阻塞事件循环的情况下处理数据。
  • 以高效、无阻塞的方式获取处理结果。
python async-await concurrency parallel-processing python-asyncio
1个回答
0
投票

在当前代码中,您为每个 URL 调用

fetch_data
,然后等待它返回结果,然后在处理下一个 URL 之前调用
process_data
并等待其结果。这是串行处理,导致没有并发!当然,编码的
process_data
也会阻止事件循环,但是您的代码无论如何都不会运行任何其他任务来被阻止。

您想要做的是创建三个同时调用

fetch_data
的任务,然后当每个
fetch_data
从您想要的 API 获取数据时,让它将该数据提交给
process_data
,以便它可以与其他任务不会阻塞事件循环。

您可以使用 asyncio.gather 创建三个

fetch_data
任务并让它们同时运行。然后,您希望获取 API 返回的结果并将其提交给
process_data
,这将通过调用 loop.run_in_executor 在多处理池中执行。

import asyncio
from time import sleep, time
from concurrent.futures import ProcessPoolExecutor
from os import cpu_count

async def fetch_data(api_url, executor):
    print(f'fetch_data {api_url} started at {time()}')
    # Simulate an API request
    await asyncio.sleep(2)
    loop = asyncio.get_running_loop()
    data = f"Data from {api_url}"
    result = await loop.run_in_executor(executor, process_data, data)
    print(f'fetch_data {api_url} returning results at time {time()}')
    return result

def process_data(data):
    # Simulate CPU-bound data processing
    sleep(3)
    return data.upper()

async def main():
    api_urls = ['api/1', 'api/2', 'api/3']
    # Don't create a pool size larger than what we need
    # nor larger than the number of cores we have:
    pool_size = min(len(api_urls), cpu_count())
    # Create the multiprocessing pool:
    executor = ProcessPoolExecutor(pool_size)

    # Fetch and process data
    results = await asyncio.gather(*(fetch_data(api_url, executor) for api_url in api_urls))
    print(results)

# So this can runder Windows, MacOS, Linux, etc:
if __name__ == '__main__':  # Required for Windows
    asyncio.run(main())

打印:

fetch_data api/1 started at 1729628872.0385263
fetch_data api/2 started at 1729628872.0385263
fetch_data api/3 started at 1729628872.0396993
fetch_data api/1 returning results at time 1729628877.2545536
fetch_data api/2 returning results at time 1729628877.256133
fetch_data api/3 returning results at time 1729628877.2581544
['DATA FROM API/1', 'DATA FROM API/2', 'DATA FROM API/3']
© www.soinside.com 2019 - 2024. All rights reserved.