考虑以下代码:
main.py
import asyncio
import websockets
async def echo(websocket):
async for message in websocket:
await websocket.send(message)
print(message)
async def main():
async with websockets.serve(echo, "localhost", 8765):
await asyncio.Future() # run forever
if __name__ == '__main__':
asyncio.run(main())
其他.py
import asyncio
import json
import websockets
tasks = set()
async def run_job(i):
await asyncio.sleep(0.)
print(f"I'm job number {i}")
async def bunch_of_tasks(ws):
for i in range(10):
task = asyncio.create_task(run_job(i), name=f'job-{i}')
tasks.add(task)
task.add_done_callback(tasks.discard)
print(f'had a nice sleep! now my value is {i}')
# await asyncio.sleep(0.)
await ws.send(json.dumps('hello there!'))
await asyncio.gather(*tasks)
print(f'tasks done')
async def do_stuff():
async with websockets.connect("ws://localhost:8766") as websocket:
await bunch_of_tasks(websocket)
await websocket.recv()
if __name__ == '__main__':
asyncio.run(do_stuff())
首先运行
main.py
然后并行运行other.py
,我得到:
had a nice sleep! now my value is 0
had a nice sleep! now my value is 1
had a nice sleep! now my value is 2
had a nice sleep! now my value is 3
had a nice sleep! now my value is 4
had a nice sleep! now my value is 5
had a nice sleep! now my value is 6
had a nice sleep! now my value is 7
had a nice sleep! now my value is 8
had a nice sleep! now my value is 9
I'm job number 0
I'm job number 1
I'm job number 2
I'm job number 3
I'm job number 4
I'm job number 5
I'm job number 6
I'm job number 7
I'm job number 8
I'm job number 9
tasks done
但是如果我在
await asyncio.sleep(0.)
之前取消注释await ws.send(json.dumps('hello there!'))
,我得到:
had a nice sleep! now my value is 0
had a nice sleep! now my value is 1
I'm job number 0
had a nice sleep! now my value is 2
I'm job number 1
had a nice sleep! now my value is 3
I'm job number 2
had a nice sleep! now my value is 4
I'm job number 3
had a nice sleep! now my value is 5
I'm job number 4
had a nice sleep! now my value is 6
I'm job number 5
had a nice sleep! now my value is 7
I'm job number 6
had a nice sleep! now my value is 8
I'm job number 7
had a nice sleep! now my value is 9
I'm job number 8
I'm job number 9
tasks done
这正是我所期望的。
所以显然将消息发送到 web socket 不会让出控制权给事件循环,并且
run_job
协程没有机会运行。但是,asyncio.sleep
有效地暂停了当前任务,并为 run_job
提供了执行的机会。
为什么会这样?
TL;DR:确实 -
await
本身并不足以保证将控制权传递回异步循环的协作部分。为确保这一点,必须在调用链下游的某处安排一个低级回调(asyncio.sleep
就是这样做的)。
长答案:
我不得不调试它 - 碰巧的是,尽管
websockets
客户端是异步的,但它归结为立即将它发送的所有数据写入 Selector 套接字 - 因为它在其他方面不受约束。
换句话说,
ws.send
最终会同步调用这一行:https://github.com/python/cpython/blob/f2e5a6ee628502d307a97f587788d7022a200229/Lib/asyncio/selector_events.py#L1071
然后,最大的惊喜是对于原始协程(未包装在任务或期货中的协程),无论何时它们返回它们的值,执行都不会屈服于异步循环 - 即使它们包含其他“等待” " 语句,如果嵌套的等待协程实际上不会在等待中“阻塞”,则 asyncio 循环永远不会返回。 在内部,在处理协程的 C 代码中,每当协程在发送后实际“阻塞”时,就会安排回调:此回调包装在 Python
asyncio.events.Handle
对象中,然后将控制权返回给 asyncio循环。
此 C 函数中的代码:https://github.com/python/cpython/blob/f2e5a6ee628502d307a97f587788d7022a200229/Modules/_asynciomodule.c#L2696。如果你跟着那里,你可以看到如果协程返回一个值,它的结果被设置在低级对象中。如果它从
send
返回另一个可等待对象,则该等待对象被安排在循环中,并且函数返回。
它可能是(并且可能是)asyncio Loops 的实现选择:出于性能原因,任何实际同步解析的所谓协程都是同步运行的。
只有当在 await 中调用的任何嵌套协同例程最终通过回调调度未来时,在叶调用中(
asyncio.sleep
确实如此),asyncio 默认循环才会运行所有其他就绪任务。 (在._run_once
中再次完全执行其asyncio.base_events.BaseEventLoop
方法)。