我的程序工作流程如下:1. IO-bound(网页获取)-> 2. cpu-bound(处理信息)-> 3. IO-bound(将结果写入数据库)。
我目前正在使用 aiohttp 来获取网页。我目前正在使用 asyncio.as_completed 收集第 1 步任务并将其传递到已完成的第 2 步。我担心的是,这可能会通过消耗 CPU 资源并阻塞步骤 2 中的程序流程来干扰步骤 1 任务的完成。
我尝试使用 ProcessPoolExecutor 将第 2 步任务分派给其他进程,但第 2 步任务使用不可pickleable 的数据结构和函数。我已经尝试过 ThreadPoolExecutor,虽然它有效(例如它没有崩溃),但我的理解是,对于 CPU 密集型任务这样做会适得其反。
由于工作流程具有中间 cpu 密集型任务,因此在进入步骤 2 之前使用 asyncio.gather(而不是 asyncio.as_completed)完成所有步骤 1 流程会更高效吗?
示例 asyncio.as_completed 代码:
async with ClientSession() as session:
tasks = {self.fetch(session, url) for url in self.urls}
for task in asyncio.as_completed(tasks):
raw_data = await asyncio.shield(task)
data = self.extract_data(*raw_data)
await self.store_data(data)
示例 asyncio.gather 代码:
async with ClientSession() as session:
tasks = {self.fetch(session, url) for url in self.urls}
results = await asyncio.gather(*tasks)
for result in results:
data = self.extract_data(*result)
await self.store_data(data)
有限样本的初步测试表明 as_completed 比 Gather 稍微高效:~2.98s (as_completed) vs ~3.15s (gather)。但是是否存在一个异步概念问题会倾向于一种解决方案而不是另一种解决方案?
“我尝试过 ThreadPoolExecutor,[...] 据我了解,对于 CPU 密集型任务这样做会适得其反。” - 从某种意义上说,你不会有两个这样的要求并行运行 Python 代码,使用多个 CPU 核心 - 但否则,它将释放你的异步循环以继续工作,如果只是咀嚼一个代码一次完成任务。
如果您无法将内容放入子进程,那么在 ThreadPoolExecutor 中运行 CPU 密集型任务就足够了。
否则,只需在 cpu 代码中撒上一些
await asyncio.sleep(0)
(在循环内),然后将它们作为协程正常运行:这足以让 cpu 绑定任务不锁定 asyncio 循环。
名称
gather
和 as_completed
对我来说是不言自明的。
gather
会等待所有任务完成后再继续,而 as_completed
更积极地尝试知道哪些等待任务已完成,并为用户(开发人员)提供了利用已完成任务生成的结果的机会任务有机会处理。仅在循环体完成后,for
循环才能检查下一次迭代中哪个其他任务已完成。