Python 异步代码:
import asyncio
from asyncio import StreamReader, StreamWriter
# send messages with nc localhost 7000
async def echo(reader: StreamReader, writer: StreamWriter):
peername = writer.get_extra_info('peername')
connection_id = f"{peername[0]}:{peername[1]}" # Using peername as connection ID
print(f'Open connection to {connection_id=}')
while data := await reader.readline():
print(f'Received data {data=}')
data = data.decode()
writer.write(f'Return as uppercase:{data.upper()}'.encode())
await writer.drain()
async def main(host='127.0.0.1', port=7000):
print(f'Server running on {host}:{port}')
server = await asyncio.start_server(echo, host, port)
async with server:
await server.serve_forever()
if __name__ == "__main__":
asyncio.run(main())
如何在上面的代码中添加信号处理,从正常关闭(CTRL+C(SIGINIT) 和 SIGTERM)?
主要是我想在
writer
中关闭 echo
,但请记住,在 async def
之外还有其他 await asyncio.start_server(echo, host, port)
我只是从简单的示例开始。
CancelledError
(除非有充分的理由,否则不要“吞掉”它):async def worker():
try:
... do work here ...
except asyncio.CancelledError:
... do cleanup here ...
raise
如果您有更多任务,请使用任务组来运行所有任务(工作人员)。只有当所有成员都退出后它才会退出。
为您想要的信号安装处理程序,并将捕获的信号转换为取消。
这是一个简单的演示:
import asyncio, signal
async def worker():
try:
while True:
print("working")
await asyncio.sleep(1)
except asyncio.CancelledError:
print("cleaning up")
raise
def set_signal_handler(signum, task_to_cancel):
def handler(_signum, _frame):
asyncio.get_running_loop().call_soon_threadsafe(task_to_cancel.cancel)
signal.signal(signum, handler)
async def main():
this_task = asyncio.current_task()
try:
set_signal_handler(signal.SIGTERM, task_to_cancel=this_task)
async with asyncio.TaskGroup() as tg:
tg.create_task(worker())
#tg.create_task(worker2())
#tg.create_task(worker3())
except asyncio.CancelledError:
pass
# TODO: reset the signal handler(s)
if __name__ == '__main__':
asyncio.run(main())
信号 (ctrl-C) 将取消与任务组一起运行的任务
main
。取消的任务组会取消其所有成员。然后,CancelledError
就会停止从任务组传播。
唯一的“技巧”是在处理程序中使用
call_soon_threadsafe
。不涉及任何线程,但信号可能随时出现。信号处理程序的代码不得干扰刚刚中断的代码,该代码可以是包括 asyncio 内部代码在内的任何代码。 threadsafe
功能非常适合。