异步Python itertools链多个生成器

问题描述 投票:0回答:3

为了清晰起见更新了问题:

假设我有 2 个处理生成器函数:

def gen1(): # just for examples,
  yield 1   # yields actually carry 
  yield 2   # different computation weight 
  yield 3   # in my case

def gen2():
  yield 4
  yield 5
  yield 6

我可以用itertools链接它们

from itertools import chain

mix = chain(gen1(), gen2())

然后我可以用它创建另一个生成器函数对象,

def mix_yield():
   for item in mix:
      yield item

或者只是如果我只想

next(mix)
,它就在那里。

我的问题是,如何在异步代码中执行等效操作?

因为我需要它:

  • 以yield方式返回(一一),或者使用
    next
    迭代器
  • 最快解决的产量优先(异步)

上一页。更新:

经过实验和研究,我发现了 aiostream 库,它声明为 itertools 的异步版本,所以我做了什么:

import asyncio
from aiostream import stream

async def gen1(): 
     await asyncio.sleep(0) 
     yield 1 
     await asyncio.sleep(0) 
     yield 2 
     await asyncio.sleep(0) 
     yield 3 

async def gen2(): 
     await asyncio.sleep(0) 
     yield 4 
     await asyncio.sleep(0) 
     yield 5 
     await asyncio.sleep(0) 
     yield 6 

a_mix = stream.combine.merge(gen1(),gen2())

async def a_mix_yield():
   for item in a_mix:
      yield item

但我还是做不到

next(a_mix)

TypeError: 'merge' object is not an iterator

next(await a_mix)

raise StreamEmpty()

虽然我仍然可以将其放入列表中:

print(await stream.list(a_mix))
# [1, 2, 4, 3, 5, 6]

所以一个目标已经完成,还有一个目标:

  • 以yield方式返回(一一),或者使用

    next
    迭代器

    - 最快解决的产量优先(异步)

python python-3.x asynchronous python-asyncio sequence-generators
3个回答
12
投票

Python 的

next
内置函数只是调用对象底层
__next__
方法的便捷方法。
__next__
的异步等效项是异步迭代器上的
__anext__
方法。标准库中没有
anext
全局函数(aiostream提供了一个),但可以轻松编写它:

async def anext(aiterator):
    return await aiterator.__anext__()

但是节省的费用太少了,在极少数需要这样做的情况下,人们也可以直接调用

__anext__
。异步迭代器又是通过调用 __aiter__ 从异步
iterable
获得的(类似于常规迭代器提供的
__iter__
)。手动驱动的异步迭代如下所示:

a_iterator = obj.__aiter__()          # regular method
elem1 = await a_iterator.__anext__()  # async method
elem2 = await a_iterator.__anext__()  # async method
...
当没有更多元素可用时,

__anext__
将引发
StopAsyncIteration
。要循环异步迭代器,应该使用
async for

这是一个可运行的示例,基于您的代码,使用

__anext__
async for
来耗尽使用
aiostream.stream.combine.merge
设置的流:

async def main():
    a_mix = stream.combine.merge(gen1(), gen2())
    async with a_mix.stream() as streamer:
        mix_iter = streamer.__aiter__()    
        print(await mix_iter.__anext__())
        print(await mix_iter.__anext__())
        print('remaining:')
        async for x in mix_iter:
            print(x)

asyncio.get_event_loop().run_until_complete(main())

2
投票

我遇到了这个答案,我查看了 aiostream 库。这是我想出的用于合并多个异步生成器的代码。它不使用任何库。

async def merge_generators(gens:Set[AsyncGenerator[Any, None]]) -> AsyncGenerator[Any, None]:
    pending = gens.copy()
    pending_tasks = { asyncio.ensure_future(g.__anext__()): g for g in pending }
    while len(pending_tasks) > 0:
        done, _ = await asyncio.wait(pending_tasks.keys(), return_when="FIRST_COMPLETED")
        for d in done:
            try:
                result = d.result()
                yield result
                dg = pending_tasks[d]
                pending_tasks[asyncio.ensure_future(dg.__anext__())] = dg
            except StopAsyncIteration as sai:
                print("Exception in getting result", sai)
            finally:
                del pending_tasks[d]

希望这对您有帮助,如果有任何错误请告诉我。


0
投票

我已经成功地使用这些简单的助手合并和链接了几个异步生成器。

可以应用不同的错误策略,所以我选择不显示任何错误 为了清楚起见,我没有将错误处理显示为不同的

import asyncio


async def merge(*streams):
    n = len(streams)
    queue = asyncio.Queue()
    signal = object()
    async def enqueue(stream):
        async for event in stream:
            await queue.put(event)
        await queue.put(signal)
    tasks = [asyncio.create_task(enqueue(stream)) for stream in streams]
    while n > 0:
        event = await queue.get()
        if event is signal:
            n -= 1
        else:
            yield event
    await asyncio.wait(tasks)


async def chain(*streams):
    for stream in streams:
        async for item in stream:
            yield item

使用示例:

async def gen(name, n, nap):
    for i in range(n):
        await asyncio.sleep(nap)
        yield f"Event #{i} for {name}"


async def main():
    print("Merging 2 async generators")
    g1 = gen("task A", 3, 0.5)
    g2 = gen("task B", 6, 0.3)
    async for item in merge(g1, g2):
        print(f"  {item}")

    print("Chaining 2 async generators")
    g1 = gen("task A", 3, 0.5)
    g2 = gen("task B", 6, 0.3)
    async for item in chain(g1, g2):
        print(f"  {item}")
© www.soinside.com 2019 - 2024. All rights reserved.