为什么向 Web 套接字发送消息不会让出对事件循环的控制?

问题描述 投票:0回答:1

考虑以下代码:

main.py

import asyncio
import websockets


async def echo(websocket):
    async for message in websocket:
        await websocket.send(message)
        print(message)


async def main():
    async with websockets.serve(echo, "localhost", 8765):
        await asyncio.Future()  # run forever

if __name__ == '__main__':
    asyncio.run(main())

其他.py

import asyncio
import json

import websockets

tasks = set()


async def run_job(i):
    await asyncio.sleep(0.)
    print(f"I'm job number {i}")


async def bunch_of_tasks(ws):
    for i in range(10):
        task = asyncio.create_task(run_job(i), name=f'job-{i}')
        tasks.add(task)
        task.add_done_callback(tasks.discard)
        print(f'had a nice sleep! now my value is {i}')
        # await asyncio.sleep(0.)
        await ws.send(json.dumps('hello there!'))
    await asyncio.gather(*tasks)
    print(f'tasks done')


async def do_stuff():
    async with websockets.connect("ws://localhost:8766") as websocket:
        await bunch_of_tasks(websocket)
        await websocket.recv()

if __name__ == '__main__':
    asyncio.run(do_stuff())

首先运行

main.py
然后并行运行
other.py
,我得到:

had a nice sleep! now my value is 0
had a nice sleep! now my value is 1
had a nice sleep! now my value is 2
had a nice sleep! now my value is 3
had a nice sleep! now my value is 4
had a nice sleep! now my value is 5
had a nice sleep! now my value is 6
had a nice sleep! now my value is 7
had a nice sleep! now my value is 8
had a nice sleep! now my value is 9
I'm job number 0
I'm job number 1
I'm job number 2
I'm job number 3
I'm job number 4
I'm job number 5
I'm job number 6
I'm job number 7
I'm job number 8
I'm job number 9
tasks done

但是如果我在

await asyncio.sleep(0.)
之前取消注释
await ws.send(json.dumps('hello there!'))
,我得到:

had a nice sleep! now my value is 0
had a nice sleep! now my value is 1
I'm job number 0
had a nice sleep! now my value is 2
I'm job number 1
had a nice sleep! now my value is 3
I'm job number 2
had a nice sleep! now my value is 4
I'm job number 3
had a nice sleep! now my value is 5
I'm job number 4
had a nice sleep! now my value is 6
I'm job number 5
had a nice sleep! now my value is 7
I'm job number 6
had a nice sleep! now my value is 8
I'm job number 7
had a nice sleep! now my value is 9
I'm job number 8
I'm job number 9
tasks done

这正是我所期望的。

所以显然将消息发送到 web socket 不会让出控制权给事件循环,并且

run_job
协程没有机会运行。但是,
asyncio.sleep
有效地暂停了当前任务,并为
run_job
提供了执行的机会。

为什么会这样?

websocket python-asyncio event-loop python-internals
1个回答
2
投票

TL;DR:确实 -

await
本身并不足以保证将控制权传递回异步循环的协作部分。为确保这一点,必须在调用链下游的某处安排一个低级回调(
asyncio.sleep
就是这样做的)。

长答案:

我不得不调试它 - 碰巧的是,尽管

websockets
客户端是异步的,但它归结为立即将它发送的所有数据写入 Selector 套接字 - 因为它在其他方面不受约束。

换句话说,

ws.send
最终会同步调用这一行:https://github.com/python/cpython/blob/f2e5a6ee628502d307a97f587788d7022a200229/Lib/asyncio/selector_events.py#L1071

然后,最大的惊喜是对于原始协程(未包装在任务或期货中的协程),无论何时它们返回它们的值,执行都不会屈服于异步循环 - 即使它们包含其他“等待” " 语句,如果嵌套的等待协程实际上不会在等待中“阻塞”,则 asyncio 循环永远不会返回。 在内部,在处理协程的 C 代码中,每当协程在发送后实际“阻塞”时,就会安排回调:此回调包装在 Python

asyncio.events.Handle
对象中,然后将控制权返回给 asyncio循环。

此 C 函数中的代码:https://github.com/python/cpython/blob/f2e5a6ee628502d307a97f587788d7022a200229/Modules/_asynciomodule.c#L2696。如果你跟着那里,你可以看到如果协程返回一个值,它的结果被设置在低级对象中。如果它从

send
返回另一个可等待对象,则该等待对象被安排在循环中,并且函数返回。

它可能是(并且可能是)asyncio Loops 的实现选择:出于性能原因,任何实际同步解析的所谓协程都是同步运行的。

只有当在 await 中调用的任何嵌套协同例程最终通过回调调度未来时,在叶调用中(

asyncio.sleep
确实如此),asyncio 默认循环才会运行所有其他就绪任务。 (在
._run_once
中再次完全执行其
asyncio.base_events.BaseEventLoop
方法)。

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.