Python 异步信号处理

问题描述 投票:0回答:1

Python 异步代码:

import asyncio
from asyncio import StreamReader, StreamWriter

# send messages with nc localhost 7000
async def echo(reader: StreamReader, writer: StreamWriter): 
    peername = writer.get_extra_info('peername')
    connection_id = f"{peername[0]}:{peername[1]}"  # Using peername as connection ID
    print(f'Open connection to {connection_id=}')

    while data := await reader.readline():
        print(f'Received data {data=}')
        data = data.decode()
        writer.write(f'Return as uppercase:{data.upper()}'.encode())
        await writer.drain()


async def main(host='127.0.0.1', port=7000):
    print(f'Server running on {host}:{port}')
    server = await asyncio.start_server(echo, host, port)
    async with server:
        await server.serve_forever()


if __name__ == "__main__":
    asyncio.run(main())

如何在上面的代码中添加信号处理,从正常关闭(CTRL+C(SIGINIT) 和 SIGTERM)?

主要是我想在

writer
中关闭
echo
,但请记住,在
async def
之外还有其他
await asyncio.start_server(echo, host, port)
我只是从简单的示例开始。

python-asyncio signals
1个回答
0
投票
  1. 允许受控清理的基本模式是捕获
    CancelledError
    (除非有充分的理由,否则不要“吞掉”它):
async def worker():
    try:
        ... do work here ...
    except asyncio.CancelledError:
        ... do cleanup here ...
        raise
  1. 如果您有更多任务,请使用任务组来运行所有任务(工作人员)。只有当所有成员都退出后它才会退出。

  2. 为您想要的信号安装处理程序,并将捕获的信号转换为取消。

这是一个简单的演示:

import asyncio, signal

async def worker():
    try:
        while True:
            print("working")
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("cleaning up")
        raise

def set_signal_handler(signum, task_to_cancel):
    def handler(_signum, _frame):
        asyncio.get_running_loop().call_soon_threadsafe(task_to_cancel.cancel)
    signal.signal(signum, handler)

async def main():
    this_task = asyncio.current_task()
    try:
        set_signal_handler(signal.SIGTERM, task_to_cancel=this_task)
        async with asyncio.TaskGroup() as tg: 
            tg.create_task(worker())
            #tg.create_task(worker2())
            #tg.create_task(worker3())
    except asyncio.CancelledError:
        pass
    # TODO: reset the signal handler(s)

if __name__ == '__main__':
    asyncio.run(main())

信号 (ctrl-C) 将取消与任务组一起运行的任务

main
。取消的任务组会取消其所有成员。然后,
CancelledError
就会停止从任务组传播。

唯一的“技巧”是在处理程序中使用

call_soon_threadsafe
。不涉及任何线程,但信号可能随时出现。信号处理程序的代码不得干扰刚刚中断的代码,该代码可以是包括 asyncio 内部代码在内的任何代码。
threadsafe
功能非常适合。

© www.soinside.com 2019 - 2024. All rights reserved.