我正在开发一个 Python 项目,我需要处理异步 I/O 任务(如读取文件或发出 API 请求)和 CPU 密集型任务(如数据处理或计算)。我正在努力寻找有效结合这两者的最佳方法。
问题:
例如,在这段代码中:
import asyncio
from time import sleep
async def fetch_data(api_url):
# Simulate an API request
await asyncio.sleep(2)
return f"Data from {api_url}"
def process_data(data):
# Simulate CPU-bound data processing
sleep(3)
return data.upper()
async def main():
api_urls = ['api/1', 'api/2', 'api/3']
# Fetch and process data
tasks = []
for url in api_urls:
data = await fetch_data(url)
result = process_data(data) # Blocking the event loop here
tasks.append(result)
print(tasks)
asyncio.run(main())
此示例在处理数据时会阻塞事件循环,从而导致效率低下。我知道我可以将
concurrent.futures
用于 CPU 绑定部分,但我不确定如何将其与 asyncio 有效地混合。
await fetch_data()
)和CPU 密集型任务(如process_data()
)?我期望:
在当前代码中,您为每个 URL 调用
fetch_data
,然后等待它返回结果,然后在处理下一个 URL 之前调用 process_data
并等待其结果。这是串行处理,导致没有并发!当然,编码的 process_data
也会阻止事件循环,但是您的代码无论如何都不会运行任何其他任务来被阻止。
您想要做的是创建三个同时调用
fetch_data
的任务,然后当每个 fetch_data
从您想要的 API 获取数据时,让它将该数据提交给 process_data
,以便它可以与其他任务不会阻塞事件循环。
您可以使用 asyncio.gather 创建三个
fetch_data
任务并让它们同时运行。然后,您希望获取 API 返回的结果并将其提交给 process_data
,这将通过调用 loop.run_in_executor 在多处理池中执行。
import asyncio
from time import sleep, time
from concurrent.futures import ProcessPoolExecutor
from os import cpu_count
async def fetch_data(api_url, executor):
print(f'fetch_data {api_url} started at {time()}')
# Simulate an API request
await asyncio.sleep(2)
loop = asyncio.get_running_loop()
data = f"Data from {api_url}"
result = await loop.run_in_executor(executor, process_data, data)
print(f'fetch_data {api_url} returning results at time {time()}')
return result
def process_data(data):
# Simulate CPU-bound data processing
sleep(3)
return data.upper()
async def main():
api_urls = ['api/1', 'api/2', 'api/3']
# Don't create a pool size larger than what we need
# nor larger than the number of cores we have:
pool_size = min(len(api_urls), cpu_count())
# Create the multiprocessing pool:
executor = ProcessPoolExecutor(pool_size)
# Fetch and process data
results = await asyncio.gather(*(fetch_data(api_url, executor) for api_url in api_urls))
print(results)
# So this can runder Windows, MacOS, Linux, etc:
if __name__ == '__main__': # Required for Windows
asyncio.run(main())
打印:
fetch_data api/1 started at 1729628872.0385263
fetch_data api/2 started at 1729628872.0385263
fetch_data api/3 started at 1729628872.0396993
fetch_data api/1 returning results at time 1729628877.2545536
fetch_data api/2 returning results at time 1729628877.256133
fetch_data api/3 returning results at time 1729628877.2581544
['DATA FROM API/1', 'DATA FROM API/2', 'DATA FROM API/3']