将异步生成器转换为同步生成器

问题描述 投票:0回答:2

想象一下,我们有一个返回生成器的原始 API(它实际上是一种从服务器带来结果页面/块的机制,同时向用户提供一个简单的生成器,并让他一个接一个地迭代这些结果。为了简单起见:

# Original sync generator
def get_results():
     # fetch from server
     yield 1
     yield 2
     # fetch next page
     yield 3
     yield 4
     # ....

现在需要实现 API 的异步版本,但是我们也需要保持旧 API 的运行。这就是事情变得复杂的地方,我们有点想将异步生成器转换为同步生成器,但我找不到一种优雅的方法来做到这一点。到目前为止,我能做的最好的工作是“首先将所有结果提取到列表中,然后在该列表上提供一个假同步生成器”。哪种违背了目的:

# Async generator
async def get_results_async():
     # await fetch from server
     yield 1
     yield 2
     # await fetch next page
     yield 3
     yield 4
     # ....


# Backward compatible sync generator
def get_results():

    async def gather_all_results():
        res = []
        async for i in get_results_async():
            res.append(i)
        return res

    res = asyncio.run(gather_all_results())
    for i in res:
        yield i

有没有更好、更优雅的方法来做到这一点,而无需在返回之前获取所有结果?

谢谢

python generator python-asyncio
2个回答
3
投票

由于asyncio具有传染性,很难编写优雅的代码将asyncio代码集成到旧代码中。对于上面的场景,流动的代码好一点,但我认为它不够优雅。

async def get_results_async():
    # await fetch from server
    yield 1
    yield 2
    # await fetch next page
    yield 3
    yield 4
    # ....


# Backward compatible sync generator
def get_results():
    gen = get_results_async()
    while True:
        try:
            yield asyncio.run(gen.__anext__())
        except StopAsyncIteration:
            break 

并且您可以重复使用事件循环而不是创建新的事件循环。

async def get_results_async():
    # await fetch from server
    yield 1
    yield 2
    # await fetch next page
    yield 3
    yield 4
    # ....

# loop that you save in somewhere.
loop = asyncio.get_event_loop()

# Backward compatible sync generator
def get_results():
    gen = get_results_async()
    while True:
        try:
            yield loop.run_until_complete(gen.__anext__())
        except StopAsyncIteration:
            break 

0
投票

接受的答案的缺点是只能在任何异步代码之外工作,这意味着如果您从异步方法调用任何遗留同步方法,它将引发。

另一种具有不同权衡的方法是在后台线程中完成工作。有 asgiref 提供了一个

asgiref.sync.AsyncToSync
适配器来实现此行为,但您可以自己实现一个简化版本。以下允许从同步代码调用异步方法。注意事项:同步调用将在方法运行时阻塞all异步任务;线程敏感的代码将无法正常工作(因为此适配器使用后台线程)。

import asyncio
import queue
import threading
from typing import Coroutine, AsyncGenerator

def async_to_sync(coroutine: Coroutine):
    "Run an async method from sync code."
    q = queue.Queue(maxsize=1)

    async def threadmain():
        q.put(await coroutine)

    thread = threading.Thread(target=asyncio.run, args=(threadmain(),), daemon=True)
    thread.start()
    return q.get()


def async_to_sync_gen(gen: AsyncGenerator):
    "Convert an async generator into a regular (sync) generator."
    q = queue.Queue(maxsize=1)

    async def threadmain():
        async for item in gen:
            q.put((False, item))
        q.put((True, None))

    thread = threading.Thread(target=asyncio.run, args=(threadmain(),), daemon=True)
    thread.start()

    while True:
        done, item = q.get()
        if done:
            break
        yield item
© www.soinside.com 2019 - 2024. All rights reserved.