想象一下,我们有一个返回生成器的原始 API(它实际上是一种从服务器带来结果页面/块的机制,同时向用户提供一个简单的生成器,并让他一个接一个地迭代这些结果。为了简单起见:
# Original sync generator
def get_results():
# fetch from server
yield 1
yield 2
# fetch next page
yield 3
yield 4
# ....
现在需要实现 API 的异步版本,但是我们也需要保持旧 API 的运行。这就是事情变得复杂的地方,我们有点想将异步生成器转换为同步生成器,但我找不到一种优雅的方法来做到这一点。到目前为止,我能做的最好的工作是“首先将所有结果提取到列表中,然后在该列表上提供一个假同步生成器”。哪种违背了目的:
# Async generator
async def get_results_async():
# await fetch from server
yield 1
yield 2
# await fetch next page
yield 3
yield 4
# ....
# Backward compatible sync generator
def get_results():
async def gather_all_results():
res = []
async for i in get_results_async():
res.append(i)
return res
res = asyncio.run(gather_all_results())
for i in res:
yield i
有没有更好、更优雅的方法来做到这一点,而无需在返回之前获取所有结果?
谢谢
由于asyncio具有传染性,很难编写优雅的代码将asyncio代码集成到旧代码中。对于上面的场景,流动的代码好一点,但我认为它不够优雅。
async def get_results_async():
# await fetch from server
yield 1
yield 2
# await fetch next page
yield 3
yield 4
# ....
# Backward compatible sync generator
def get_results():
gen = get_results_async()
while True:
try:
yield asyncio.run(gen.__anext__())
except StopAsyncIteration:
break
并且您可以重复使用事件循环而不是创建新的事件循环。
async def get_results_async():
# await fetch from server
yield 1
yield 2
# await fetch next page
yield 3
yield 4
# ....
# loop that you save in somewhere.
loop = asyncio.get_event_loop()
# Backward compatible sync generator
def get_results():
gen = get_results_async()
while True:
try:
yield loop.run_until_complete(gen.__anext__())
except StopAsyncIteration:
break
接受的答案的缺点是只能在任何异步代码之外工作,这意味着如果您从异步方法调用任何遗留同步方法,它将引发。
另一种具有不同权衡的方法是在后台线程中完成工作。有 asgiref 提供了一个
asgiref.sync.AsyncToSync
适配器来实现此行为,但您可以自己实现一个简化版本。以下允许从同步代码调用异步方法。注意事项:同步调用将在方法运行时阻塞all异步任务;线程敏感的代码将无法正常工作(因为此适配器使用后台线程)。
import asyncio
import queue
import threading
from typing import Coroutine, AsyncGenerator
def async_to_sync(coroutine: Coroutine):
"Run an async method from sync code."
q = queue.Queue(maxsize=1)
async def threadmain():
q.put(await coroutine)
thread = threading.Thread(target=asyncio.run, args=(threadmain(),), daemon=True)
thread.start()
return q.get()
def async_to_sync_gen(gen: AsyncGenerator):
"Convert an async generator into a regular (sync) generator."
q = queue.Queue(maxsize=1)
async def threadmain():
async for item in gen:
q.put((False, item))
q.put((True, None))
thread = threading.Thread(target=asyncio.run, args=(threadmain(),), daemon=True)
thread.start()
while True:
done, item = q.get()
if done:
break
yield item