asyncio 可等待对象 - 基本示例

问题描述 投票:0回答:3

我试图了解如何制作一个可等待的对象。 文档中的定义指出:

具有返回迭代器的 __await__ 方法的对象。

在该定义的指导下,我编写了示例代码:

import asyncio

async def produce_list():
        num = await Customer()
        print(num)

class Customer(object):

    def __await__(self):
        return iter([1, 2, 3, 4])

loop = asyncio.get_event_loop()
loop.run_until_complete(produce_list())

我预期的流程是:

  1. 事件循环将控制权交给
    produce_list()
    produce_list()
    放弃执行
    num = await Customer()
  2. Customer()
    被执行并返回一个迭代器。因为它返回迭代器中的第一个值。 Q1:我不清楚为什么
    num
    没有成为迭代器本身。还有这里的
    send
    是做什么的?
  3. 一旦达到迭代器的最后一个值。
    num = 4
    协程继续执行到
    print(num)
    ,并打印值 4。

我得到了什么:

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
~/workspace/dashboard/so_question_await.py in <module>()
     16 
     17 loop = asyncio.get_event_loop()
---> 18 loop.run_until_complete(produce_list())

/usr/lib/python3.5/asyncio/base_events.py in run_until_complete(self, future)
    464             raise RuntimeError('Event loop stopped before Future completed.')
    465 
--> 466         return future.result()
    467 
    468     def stop(self):

/usr/lib/python3.5/asyncio/futures.py in result(self)
    291             self._tb_logger = None
    292         if self._exception is not None:
--> 293             raise self._exception
    294         return self._result
    295 

/usr/lib/python3.5/asyncio/tasks.py in _step(***failed resolving arguments***)
    239                 result = coro.send(None)
    240             else:
--> 241                 result = coro.throw(exc)
    242         except StopIteration as exc:
    243             self.set_result(exc.value)

~/workspace/dashboard/so_question_await.py in produce_list()
      5 
      6 async def produce_list():
----> 7         num = await Customer()
      8         print(num)
      9 

RuntimeError: Task got bad yield: 1

我在这里弄错了什么概念?

最后,我正在寻找一个示例,该示例使用列表迭代作为事件来返回协程的控制。

python python-3.x asynchronous async-await python-asyncio
3个回答
11
投票

__await__
返回一个迭代器,因为协程的底层机制最初是基于
yield from
语法的。实际上,
__await__
返回
iter(some_future)
some_coroutine.__await__()
。它可用于创建每次等待时都会产生不同值的对象。看这个简单的例子:

import asyncio
import random

class RandomProducer:

    def __await__(self):
        return self.producer().__await__()

    async def producer(self):
        sleep = random.random()
        value = random.randint(0, 9)
        return await asyncio.sleep(sleep, result=value)

async def main():
    producer = RandomProducer()
    while True:
        print(await producer)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

回答您的意见:

每个协程最终都会调用

asyncio.sleep
吗?

不,

asyncio.sleep
实际上不是链条的末端。在最底层,总是产生一个 future:协程链询问事件循环“当这个 future 有结果时请叫醒我”。在
asyncio.sleep
的情况下,它使用
loop.call_later
来设置给定时间后的未来结果。循环提供了更多调度回调的方法:
loop.call_at
loop.add_reader
loop.add_writer
loop.add_signal_handler
等。

asyncio 库,例如 aiohttp。我假设某个地方有一些代码不依赖于先前协程的存在。

所有的IO操作最终都必须委托给事件循环才能实现单线程并发。 例如,aiohttp 依赖于 loop.create_connection 协程来管理 TCP 连接


2
投票

@Vincent 的答案可能就是您想要的,因为它会让您使用

__await__
方法来调用更多异步代码。

但是,稍微搜索一下 python 的 Tasks 实现,就会发现 async 返回的迭代器应该返回 None(将控制发送回事件循环),或者引发一个异常,其值就是它们的返回值。

所以,以下工作:

class MyIter:
    def __init__(self):
        self.n = 2
    def __next__(self):
        self.n -= 1
        if self.n >= 0:
            return None # yields control to outer loop
        raise StopIteration(12) # "return" value stored in exc.value

class MyTask:
    def __await__(self):
        return MyIter()

async def test():
    ans = await MyTask()
    print(ans)

0
投票

所有 asyncio 都是通过顺序任务迭代构建的。您可以使用

__await__
方法创建自定义迭代器/生成器,或者您可以通过老式方式创建协程:

import types, asyncio


@types.coroutine
def arange(start):
    numbers = []
    for i in range(start):
        numbers.append(i)
        yield  # always should be None
    return numbers


async def main():
    return await arange(10)


print(asyncio.run(main()))

如果你想制作异步发电机,请阅读this

为了更好地理解,您可以将 asyncio event_loop 想象为运行生成器的双端队列,这是一个简单的示例:

from collections import deque


def arange(stop):
    for i in range(1, stop + 1, 1):
        print(i, "/", stop)
        yield
    else:
        return i


def gather(*tasks):
    return_values = {task: None for task in tasks}
    events = deque(tasks)

    while events:
        try:
            task = events.pop()
            next(task)
            events.appendleft(task)
        except StopIteration as exc:
            return_values[task] = exc.value

    return tuple(return_values.values())


result1, result2 = gather(arange(3), arange(5))
print(result1, result2)

结果与您使用

arange
声明
types.coroutine
并使用
await asyncio.gather(*tasks)

调用这些等待项相同
© www.soinside.com 2019 - 2024. All rights reserved.