如何在Python中使用`async for`?

问题描述 投票:0回答:4

我的意思是使用

async for
我能得到什么。这是我用
async for
编写的代码,
AIter(10)
可以替换为
get_range()

但是代码运行起来像同步而不是异步。

import asyncio

async def get_range():
    for i in range(10):
        print(f"start {i}")
        await asyncio.sleep(1)
        print(f"end {i}")
        yield i

class AIter:
    def __init__(self, N):
        self.i = 0
        self.N = N

    def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        print(f"start {i}")
        await asyncio.sleep(1)
        print(f"end {i}")
        if i >= self.N:
            raise StopAsyncIteration
        self.i += 1
        return i

async def main():
    async for p in AIter(10):
        print(f"finally {p}")

if __name__ == "__main__":
    asyncio.run(main())

我排除的结果应该是:

start 1
start 2
start 3
...
end 1
end 2
...
finally 1
finally 2
...

然而,真正的结果是:

start 0
end 0
finally 0
start 1
end 1
finally 1
start 2
end 2

我知道我可以通过使用

asyncio.gather
asyncio.wait
获得例外结果。

但是我很难理解在这里使用

async for
而不是简单的
for
得到了什么。

如果我想循环多个

async for
对象并在一个完成后立即使用它们,那么使用
Feature
的正确方法是什么。例如:

async for f in feature_objects:
    data = await f
    with open("file", "w") as fi:
        fi.write()
python asynchronous python-asyncio
4个回答
152
投票

但是我很难理解在这里使用

async for
而不是简单的
for
得到了什么。

潜在的误解是期望

async for
自动并行化迭代。它并没有这样做,它只是允许在异步源上进行顺序迭代。例如,您可以使用 async for 迭代来自 TCP 流的行、来自 Websocket 的消息或来自异步数据库驱动程序的数据库记录。
以上都不适用于普通的 

for

,至少在不阻塞事件循环的情况下是这样。这是因为

for
__next__
 作为阻塞函数调用,并且不等待其结果。您无法手动删除由 
await
获得的
for
元素,因为
for
希望
__next__
通过升高
StopIteration
来表示迭代结束。如果
__next__
是协程,则
StopIteration
异常在等待它之前将不可见。这就是为什么引入
async for
的原因,不仅在 Python 中,而且在具有 async/await 和通用化
for
other languages 中。
如果要并行运行循环迭代,则需要将它们作为并行协程启动,并使用 

asyncio.as_completed

 或等效方法来检索结果:
async def x(i): print(f"start {i}") await asyncio.sleep(1) print(f"end {i}") return i # run x(0)..x(10) concurrently and process results as they arrive for f in asyncio.as_completed([x(i) for i in range(10)]): result = await f # ... do something with the result ...

如果您不关心结果到达时立即做出反应,但您需要全部结果,则可以使用 
asyncio.gather

:

 使其变得更简单
# run x(0)..x(10) concurrently and process results when all are done results = await asyncio.gather(*[x(i) for i in range(10)])



12
投票
Charlie

的赏金)。 假设您想同时使用每个产生的值,一个简单的方法是:

import asyncio async def process_all(): tasks = [] async for obj in my_async_generator: # Python 3.7+. Use ensure_future for older versions. task = asyncio.create_task(process_obj(obj)) tasks.append(task) await asyncio.gather(*tasks) async def process_obj(obj): ...

说明:

考虑以下代码,不带

create_task

async def process_all():
    async for obj in my_async_generator:
        await process_obj(obj))

这大致相当于:

async def process_all(): obj1 = await my_async_generator.__anext__(): await process_obj(obj1)) obj2 = await my_async_generator.__anext__(): await process_obj(obj2)) ...

基本上,循环无法继续,因为它的主体阻塞了。正确的方法是将每次迭代的处理委托给一个新的异步任务,该任务将在不阻塞循环的情况下启动。然后,
gather

等待所有任务 - 这意味着要处理每个迭代。

    


0
投票
async for

不会创建要同时运行的任务。它用于允许对异步源进行顺序迭代。

例如,在 

aiokafka

中,您可以执行

async for msg in consumer
。 每次迭代都会调用
__anext__
中的
consumer
方法。该方法定义为
async def __anext__
,允许在其中调用
await self.get_one()
相比之下,当您使用普通的 for 循环时,它会在内部调用 

__next__

特殊方法。但是,常规

__next__
方法不支持等待异步源,例如使用
await get_one()
    


-1
投票
很棒的答案

的代码,只是缺少异步生成器以使其可运行,一旦我有了它(或者如果有人想贡献它)就会完成这个: import time import asyncio async def process_all(): """ Example where the async for loop allows to loop through concurrently many things without blocking on each individual iteration but blocks (waits) for all tasks to run. ref: - https://stackoverflow.com/questions/56161595/how-to-use-async-for-in-python/72758067#72758067 """ tasks = [] async for obj in my_async_generator: # Python 3.7+. Use ensure_future for older versions. task = asyncio.create_task(process_obj(obj)) # concurrently dispatches a coroutine to be executed. tasks.append(task) await asyncio.gather(*tasks) async def process_obj(obj): await asyncio.sleep(5) # expensive IO if __name__ == '__main__': # - test asyncio s = time.perf_counter() asyncio.run(process_all()) # - print stats elapsed = time.perf_counter() - s print(f"{__file__} executed in {elapsed:0.2f} seconds.") print('Success, done!\a')

© www.soinside.com 2019 - 2024. All rights reserved.