asyncio:从执行器中的异步函数收集结果

问题描述 投票:0回答:1

我想启动大量 HTTP 请求并在所有请求返回后收集它们的结果。使用

asyncio
可以以非阻塞方式发送请求,但我在收集结果时遇到问题。

我知道针对此特定问题的解决方案,例如 aiohttp。但HTTP请求只是一个例子,我的问题是如何正确使用

asyncio

在服务器端,我有一个 Flask,它用“Hello World!”来回答每个对

localhost/
的请求,但它会在回答之前等待 0.1 秒。在我的所有示例中,我发送了 10 个请求。同步代码大约需要 1 秒,异步版本可以在 0.1 秒内完成。

在客户端,我想同时启动许多请求并收集它们的结果。我正在尝试以三种不同的方式做到这一点。由于 asyncio 需要一个执行器来解决阻塞代码,因此所有方法都调用

loop.run_in_executor

此代码在他们之间共享:

import requests
from time import perf_counter
import asyncio

loop = asyncio.get_event_loop()

async def request_async():
    r = requests.get("http://127.0.0.1:5000/")
    return r.text

def request_sync():
    r = requests.get("http://127.0.0.1:5000/")
    return r.text

方法一:

在任务列表上使用

asyncio.gather()
,然后使用
run_until_complete
。阅读完Asyncio.gather vs asyncio.wait后,看起来聚集会等待结果。但事实并非如此。因此,此代码会立即返回,而无需等待请求完成。 如果我在这里使用阻塞函数,就可以了。为什么我不能使用异步函数?

# approach 1
start = perf_counter()
tasks = []
for i in range(10):
    tasks.append(loop.run_in_executor(None, request_async)) # <---- using async function !

gathered_tasks = asyncio.gather(*tasks)
results = loop.run_until_complete(gathered_tasks)
stop = perf_counter()
print(f"finished {stop - start}") # 0.003

# approach 1(B)
start = perf_counter()
tasks = []
for i in range(10):
    tasks.append(loop.run_in_executor(None, request_sync)) # <---- using sync function

gathered_tasks = asyncio.gather(*tasks)
results = loop.run_until_complete(gathered_tasks)

stop = perf_counter()
print(f"finished {stop - start}") # 0.112

Python 甚至警告我,

coroutine "request_async"
从未被等待过。 此时,我有一个可行的解决方案:在执行器中使用普通(非异步)函数。但我想要一个适用于
async
函数定义的解决方案。因为我想在其中使用
await
(在这个简单的示例中,这不是必需的,但如果我将更多代码移至
asyncio
,我确信它会变得很重要)。

方法2:

Python 警告我,我的协程永远不会被等待。那么,让我们等待他们吧。方法 2 将所有代码包装到外部异步函数中并等待收集结果。同样的问题,也立即返回(也是同样的警告):

# approach 2
async def main():

    tasks = []
    for i in range(10):
        tasks.append(loop.run_in_executor(None, request_async))

    gathered_tasks = asyncio.gather(*tasks)

    return await gathered_tasks # <-------- here I'm waiting on the coroutine 

start = perf_counter()
results = loop.run_until_complete(main())
stop = perf_counter()
print(f"finished {stop - start}")  # 0.0036

这真的让我很困惑。我正在等待

gather
的结果。直观上,这应该传播到我正在收集的协程。但 python 仍然抱怨我的协程从未被等待。

我阅读了更多内容并发现:如何在 asyncio 中使用请求?

这几乎就是我的例子:结合

requests
asyncio
。这让我想到方法 3:

方法3:

与方法 2 的结构相同,但等待单独分配给

run_in_executor()
的每个任务(当然这算作等待协程):

# approach 3:
# wrapping executor in coroutine
# awaiting every task individually
async def main():

    tasks = []
    for i in range(10):
        task = loop.run_in_executor(None, request_async)
        tasks.append(task)

    responses = []
    for task in tasks:
        response = await task
        responses.append(response)

    return responses

start = perf_counter()
results = loop.run_until_complete(main())
stop = perf_counter()

print(f"finished {stop - start}") # 0.004578

我的问题是:我想在我的协程中包含阻塞代码并与执行器并行运行它们。我如何获得他们的结果?

python asynchronous async-await python-asyncio coroutine
1个回答
25
投票

我的问题是:我想在我的协程中包含阻塞代码并与执行器并行运行它们。我如何获得他们的结果?

答案是你的协程中不应该有阻塞代码。如果你必须拥有它,你必须使用

run_in_executor
来隔离它。所以使用
request_async
来写
requests
的正确方法是:

async def request_async():
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, request_sync)

request_async
传递给
run_in_executor
没有意义,因为 run_in_executor 的整个
point
是在不同线程中调用 sync 函数。如果你给它一个协程函数,它会很乐意调用它(在另一个线程中)并提供返回的协程对象作为“结果”。这就像将生成器传递给需要普通函数的代码 - 是的,它会很好地调用生成器,但它不知道如何处理返回的对象。

一般来说,您不能只将

async
放在
def
前面并期望获得可用的协程。协程不得阻塞,除非等待其他异步代码。

一旦您拥有可用的

request_async
,您就可以使用
asyncio.gather
等标准工具收集其结果:

async def main():
    coros = [request_async() for _i in range(10)]
    results = await asyncio.gather(*coros)
    return results

results = loop.run_until_complete(main())
© www.soinside.com 2019 - 2024. All rights reserved.