我正在寻找
asyncio.create_task
等价于 AsyncGenerator
。我希望生成器已经开始在后台执行,而无需显式等待结果。
例如:
async def g():
for i in range(3):
await asyncio.sleep(1.0)
yield i
async def main():
g1 = g()
g2 = g()
t = time.time()
async for i in g1:
print(i, time.time() - t)
async for i in g2:
print(i, time.time() - t)
执行需要6秒:
0 1.001204013824463
1 2.0024218559265137
2 3.004373788833618
0 4.00572395324707
1 5.007828950881958
2 6.009296894073486
如果两个生成器并行执行,则总执行时间只需约 3 秒。这里推荐的方法是什么?
您想创建两个任务并并行运行它们。这是一种方法。每个任务都涉及一个异步生成器并不重要。
import asyncio
import time
async def g():
for i in range(3):
await asyncio.sleep(1.0)
yield i
async def a_task(t):
async for i in g():
print(i, time.time() - t)
async def main():
t = time.time()
await asyncio.gather(a_task(t), a_task(t))
print("Total time", time.time() - t)
asyncio.run(main())
结果:
0 1.0009820461273193
0 1.0009820461273193
1 2.001558303833008
1 2.001558303833008
2 3.001593828201294
2 3.001593828201294
在您的示例中,您在开始第二个循环之前打印了第一个循环中的各个项目,但我认为这不是您真正想要的。
这是我想出的方法:
async def consume(a_iter: AsyncIterator) -> Tuple[Any, Optional[asyncio.Task]]:
try:
return await a_iter.__anext__(), asyncio.create_task(consume(a_iter))
except StopAsyncIteration:
return None, None
def create_generator_task(gen: AsyncGenerator) -> AsyncGenerator:
result_queue = asyncio.create_task(consume(gen.__aiter__()))
async def consumer():
nonlocal result_queue
while 1:
item, result_queue = await result_queue
if result_queue is None:
assert item is None
return
yield item
return consumer()
如果我缺少标准库中没有任何官方解决方案,则此方法效果很好。 例子:
async def main():
g1 = create_generator_task(g())
g2 = create_generator_task(g())
t = time.time()
async for i in g1:
print(i, time.time() - t)
async for i in g2:
print(i, time.time() - t)
印花:
0 1.0013220310211182
1 2.002387046813965
2 3.0078349113464355
0 3.00785493850708
1 3.007858991622925
2 3.007862091064453