我的意思是使用
async for
我能得到什么。这是我用async for
编写的代码,AIter(10)
可以替换为get_range()
。
但是代码运行起来像同步而不是异步。
import asyncio
async def get_range():
for i in range(10):
print(f"start {i}")
await asyncio.sleep(1)
print(f"end {i}")
yield i
class AIter:
def __init__(self, N):
self.i = 0
self.N = N
def __aiter__(self):
return self
async def __anext__(self):
i = self.i
print(f"start {i}")
await asyncio.sleep(1)
print(f"end {i}")
if i >= self.N:
raise StopAsyncIteration
self.i += 1
return i
async def main():
async for p in AIter(10):
print(f"finally {p}")
if __name__ == "__main__":
asyncio.run(main())
我排除的结果应该是:
start 1
start 2
start 3
...
end 1
end 2
...
finally 1
finally 2
...
然而,真正的结果是:
start 0
end 0
finally 0
start 1
end 1
finally 1
start 2
end 2
我知道我可以通过使用
asyncio.gather
或 asyncio.wait
获得例外结果。
但是我很难理解在这里使用
async for
而不是简单的for
得到了什么。
如果我想循环多个
async for
对象并在一个完成后立即使用它们,那么使用 Feature
的正确方法是什么。例如:
async for f in feature_objects:
data = await f
with open("file", "w") as fi:
fi.write()
但是我很难理解在这里使用
而不是简单的async for
得到了什么。for
async for
自动并行化迭代。它并没有这样做,它只是允许在异步源上进行顺序迭代。例如,您可以使用 async for
迭代来自 TCP 流的行、来自 Websocket 的消息或来自异步数据库驱动程序的数据库记录。以上都不适用于普通的 for
,至少在不阻塞事件循环的情况下是这样。这是因为
for
将 __next__
作为阻塞函数调用,并且不等待其结果。您无法手动删除由
await
获得的 for
元素,因为 for
希望 __next__
通过升高 StopIteration
来表示迭代结束。如果 __next__
是协程,则 StopIteration
异常在等待它之前将不可见。这就是为什么引入 async for
的原因,不仅在 Python 中,而且在具有 async/await 和通用化 for
的 other languages 中。如果要并行运行循环迭代,则需要将它们作为并行协程启动,并使用 或等效方法来检索结果:
async def x(i):
print(f"start {i}")
await asyncio.sleep(1)
print(f"end {i}")
return i
# run x(0)..x(10) concurrently and process results as they arrive
for f in asyncio.as_completed([x(i) for i in range(10)]):
result = await f
# ... do something with the result ...
如果您不关心结果到达时立即做出反应,但您需要全部结果,则可以使用
asyncio.gather
:
使其变得更简单
# run x(0)..x(10) concurrently and process results when all are done
results = await asyncio.gather(*[x(i) for i in range(10)])
的赏金)。 假设您想同时使用每个产生的值,一个简单的方法是:
import asyncio
async def process_all():
tasks = []
async for obj in my_async_generator:
# Python 3.7+. Use ensure_future for older versions.
task = asyncio.create_task(process_obj(obj))
tasks.append(task)
await asyncio.gather(*tasks)
async def process_obj(obj):
...
说明:
create_task
:
async def process_all():
async for obj in my_async_generator:
await process_obj(obj))
这大致相当于:
async def process_all():
obj1 = await my_async_generator.__anext__():
await process_obj(obj1))
obj2 = await my_async_generator.__anext__():
await process_obj(obj2))
...
基本上,循环无法继续,因为它的主体阻塞了。正确的方法是将每次迭代的处理委托给一个新的异步任务,该任务将在不阻塞循环的情况下启动。然后,
gather
等待所有任务 - 这意味着要处理每个迭代。
async for
不会创建要同时运行的任务。它用于允许对异步源进行顺序迭代。
例如,在aiokafka
中,您可以执行
async for msg in consumer
。
每次迭代都会调用 __anext__
中的 consumer
方法。该方法定义为 async def __anext__
,允许在其中调用 await self.get_one()
。相比之下,当您使用普通的 for 循环时,它会在内部调用 __next__
特殊方法。但是,常规
__next__
方法不支持等待异步源,例如使用 await get_one()
。的代码,只是缺少异步生成器以使其可运行,一旦我有了它(或者如果有人想贡献它)就会完成这个:
import time
import asyncio
async def process_all():
"""
Example where the async for loop allows to loop through concurrently many things without blocking on each individual
iteration but blocks (waits) for all tasks to run.
ref:
- https://stackoverflow.com/questions/56161595/how-to-use-async-for-in-python/72758067#72758067
"""
tasks = []
async for obj in my_async_generator:
# Python 3.7+. Use ensure_future for older versions.
task = asyncio.create_task(process_obj(obj)) # concurrently dispatches a coroutine to be executed.
tasks.append(task)
await asyncio.gather(*tasks)
async def process_obj(obj):
await asyncio.sleep(5) # expensive IO
if __name__ == '__main__':
# - test asyncio
s = time.perf_counter()
asyncio.run(process_all())
# - print stats
elapsed = time.perf_counter() - s
print(f"{__file__} executed in {elapsed:0.2f} seconds.")
print('Success, done!\a')