我试图了解如何制作一个可等待的对象。 文档中的定义指出:
具有返回迭代器的 __await__ 方法的对象。
在该定义的指导下,我编写了示例代码:
import asyncio
async def produce_list():
num = await Customer()
print(num)
class Customer(object):
def __await__(self):
return iter([1, 2, 3, 4])
loop = asyncio.get_event_loop()
loop.run_until_complete(produce_list())
我预期的流程是:
produce_list()
。 produce_list()
放弃执行 num = await Customer()
。Customer()
被执行并返回一个迭代器。因为它返回迭代器中的第一个值。 Q1:我不清楚为什么num
没有成为迭代器本身。还有这里的send
是做什么的?num = 4
协程继续执行到print(num)
,并打印值 4。我得到了什么:
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
~/workspace/dashboard/so_question_await.py in <module>()
16
17 loop = asyncio.get_event_loop()
---> 18 loop.run_until_complete(produce_list())
/usr/lib/python3.5/asyncio/base_events.py in run_until_complete(self, future)
464 raise RuntimeError('Event loop stopped before Future completed.')
465
--> 466 return future.result()
467
468 def stop(self):
/usr/lib/python3.5/asyncio/futures.py in result(self)
291 self._tb_logger = None
292 if self._exception is not None:
--> 293 raise self._exception
294 return self._result
295
/usr/lib/python3.5/asyncio/tasks.py in _step(***failed resolving arguments***)
239 result = coro.send(None)
240 else:
--> 241 result = coro.throw(exc)
242 except StopIteration as exc:
243 self.set_result(exc.value)
~/workspace/dashboard/so_question_await.py in produce_list()
5
6 async def produce_list():
----> 7 num = await Customer()
8 print(num)
9
RuntimeError: Task got bad yield: 1
我在这里弄错了什么概念?
最后,我正在寻找一个示例,该示例使用列表迭代作为事件来返回协程的控制。
__await__
返回一个迭代器,因为协程的底层机制最初是基于 yield from
语法的。实际上,__await__
返回iter(some_future)
或some_coroutine.__await__()
。它可用于创建每次等待时都会产生不同值的对象。看这个简单的例子:
import asyncio
import random
class RandomProducer:
def __await__(self):
return self.producer().__await__()
async def producer(self):
sleep = random.random()
value = random.randint(0, 9)
return await asyncio.sleep(sleep, result=value)
async def main():
producer = RandomProducer()
while True:
print(await producer)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
回答您的意见:
每个协程最终都会调用
吗?asyncio.sleep
不,
asyncio.sleep
实际上不是链条的末端。在最底层,总是产生一个 future:协程链询问事件循环“当这个 future 有结果时请叫醒我”。在 asyncio.sleep
的情况下,它使用 loop.call_later
来设置给定时间后的未来结果。循环提供了更多调度回调的方法:loop.call_at
、loop.add_reader
、loop.add_writer
、loop.add_signal_handler
等。
asyncio 库,例如 aiohttp。我假设某个地方有一些代码不依赖于先前协程的存在。
所有的IO操作最终都必须委托给事件循环才能实现单线程并发。 例如,aiohttp 依赖于 loop.create_connection 协程来管理 TCP 连接。
@Vincent 的答案可能就是您想要的,因为它会让您使用
__await__
方法来调用更多异步代码。
但是,稍微搜索一下 python 的 Tasks 实现,就会发现 async 返回的迭代器应该返回 None(将控制发送回事件循环),或者引发一个异常,其值就是它们的返回值。
所以,以下工作:
class MyIter:
def __init__(self):
self.n = 2
def __next__(self):
self.n -= 1
if self.n >= 0:
return None # yields control to outer loop
raise StopIteration(12) # "return" value stored in exc.value
class MyTask:
def __await__(self):
return MyIter()
async def test():
ans = await MyTask()
print(ans)
所有 asyncio 都是通过顺序任务迭代构建的。您可以使用
__await__
方法创建自定义迭代器/生成器,或者您可以通过老式方式创建协程:
import types, asyncio
@types.coroutine
def arange(start):
numbers = []
for i in range(start):
numbers.append(i)
yield # always should be None
return numbers
async def main():
return await arange(10)
print(asyncio.run(main()))
如果你想制作异步发电机,请阅读this
为了更好地理解,您可以将 asyncio event_loop 想象为运行生成器的双端队列,这是一个简单的示例:
from collections import deque
def arange(stop):
for i in range(1, stop + 1, 1):
print(i, "/", stop)
yield
else:
return i
def gather(*tasks):
return_values = {task: None for task in tasks}
events = deque(tasks)
while events:
try:
task = events.pop()
next(task)
events.appendleft(task)
except StopIteration as exc:
return_values[task] = exc.value
return tuple(return_values.values())
result1, result2 = gather(arange(3), arange(5))
print(result1, result2)
结果与您使用
arange
声明 types.coroutine
并使用 await asyncio.gather(*tasks)
调用这些等待项相同