我有以下服务器代码:
import asyncio
import json
async def conn_handler(reader, writer):
addr = writer.get_extra_info('peername')
print(f"{addr} connected")
while True:
data_len = await reader.read(2)
data_len = int.from_bytes(data_len, byteorder="big")
if data_len == 0:
break
if data := await reader.readexactly(data_len):
msg = json.loads(data)
print(f"received: {msg}")
msg = json.dumps(msg)
msg_len = len(msg).to_bytes(2, byteorder="big")
writer.write(msg_len + msg.encode())
await writer.drain()
print(f"sent: {msg}")
print(f"{addr} closed")
async def start_server():
server = await asyncio.start_server(conn_handler, "0.0.0.0", 10999)
print(f'Serving on {server.sockets[0].getsockname()}')
async with server:
await server.serve_forever()
try:
asyncio.run(start_server())
except KeyboardInterrupt:
print("keyboard interrupt occured")
以及以下客户端代码:
import asyncio
import json
import logging
import pprint
import uuid
outstanding_msgs = []
msgs_count = 0
async def send_msgs_loop(writer):
with open("msgs.json") as f:
msgs = json.load(f)
global msgs_count
msgs_count = len(msgs)
for msg in msgs:
msg["id"] = str(uuid.uuid4())
outstanding_msgs.append(msg["id"])
msg = json.dumps(msg)
msg_len = len(msg)
msg_len = msg_len.to_bytes(2, byteorder="big")
writer.write(msg_len + msg.encode())
await writer.drain()
print(f"sent: {msg}")
async def read_msgs_loop(reader):
received_count = 0
while True:
msg_len = await reader.read(2)
msg_len = int.from_bytes(msg_len, byteorder="big")
if msg_len == 0:
break
if msg := await reader.readexactly(msg_len):
msg = json.loads(msg)
if msg["id"] in outstanding_msgs:
received_count = received_count + 1
outstanding_msgs.remove(msg["id"])
print(f"received: {msg}")
if not outstanding_msgs and received_count == msgs_count:
print("all responses received")
break
async def start_client():
reader, writer = await asyncio.open_connection("localhost", 10999)
await asyncio.gather(send_msgs_loop(writer), read_msgs_loop(reader))
writer.close()
await writer.wait_closed()
if __name__ == '__main__':
asyncio.run(start_client(), debug=False)
我需要一些指导,如何在键盘中断时在服务器上实现正常关闭。
当没有连接的客户端时,尝试 except on
asyncio.run
工作正常。但是,如果有连接的客户端,我会在 conn_handler 中遇到异常。通常,它看起来像这样:
future: <Task finished name='Task-5' coro=<conn_handler() done, defined at C:\Users\tsku1460\myPython\pycon_2024\async_server.py:4> exception=KeyboardInte
rrupt()>
Traceback (most recent call last):
File "C:\Users\tsku1460\myPython\pycon_2024\async_server.py", line 40, in <module>
asyncio.run(start_server())
File "C:\Program Files\Python310\lib\asyncio\runners.py", line 44, in run
return loop.run_until_complete(main)
File "C:\Program Files\Python310\lib\asyncio\base_events.py", line 636, in run_until_complete
self.run_forever()
File "C:\Program Files\Python310\lib\asyncio\windows_events.py", line 321, in run_forever
super().run_forever()
File "C:\Program Files\Python310\lib\asyncio\base_events.py", line 603, in run_forever
self._run_once()
File "C:\Program Files\Python310\lib\asyncio\base_events.py", line 1909, in _run_once
handle._run()
File "C:\Program Files\Python310\lib\asyncio\events.py", line 80, in _run
self._context.run(self._callback, *self._args)
File "C:\Users\tsku1460\myPython\pycon_2024\async_server.py", line 25, in conn_handler
print(f"sent: {msg}")
KeyboardInterrupt
如果我将 try-except 放在
conn_handler
中,那么我需要按 CTRL+C 两次(一次被 conn_handler 捕获,一次被 asyncio.run 周围的主 try-catch 块捕获)。如果我像这样从 conn_handler 传播异常:
except KeyboardInterrupt as e:
raise
然后我明白了:
keyboard interrupt occured
Task exception was never retrieved
future: <Task finished name='Task-5' coro=<conn_handler() done, defined at C:\Users\tsku1460\myPython\pycon_2024\async_server.py:4> exception=KeyboardInte
rrupt()>
Traceback (most recent call last):
File "C:\Users\tsku1460\myPython\pycon_2024\async_server.py", line 41, in <module>
asyncio.run(start_server())
File "C:\Program Files\Python310\lib\asyncio\runners.py", line 44, in run
return loop.run_until_complete(main)
File "C:\Program Files\Python310\lib\asyncio\base_events.py", line 636, in run_until_complete
self.run_forever()
File "C:\Program Files\Python310\lib\asyncio\windows_events.py", line 321, in run_forever
super().run_forever()
File "C:\Program Files\Python310\lib\asyncio\base_events.py", line 603, in run_forever
self._run_once()
File "C:\Program Files\Python310\lib\asyncio\base_events.py", line 1909, in _run_once
handle._run()
File "C:\Program Files\Python310\lib\asyncio\events.py", line 80, in _run
self._context.run(self._callback, *self._args)
File "C:\Users\tsku1460\myPython\pycon_2024\async_server.py", line 18, in conn_handler
print(f"received: {msg}")
KeyboardInterrupt
因此异常被传播到主 try-catch 块(我们可以看到文本“发生键盘中断”),但仍然有问题。
这是 msgs.json 的示例内容:
[
{"amount": 10, "card": "1213212312", "terminal": "ABC"},
{"amount": 25, "card": "5555555552", "terminal": "CDE"},
{"amount": 30, "card": "4444444442", "terminal": "EFG"},
{"amount": 10, "card": "1213212312", "terminal": "ABC"},
{"amount": 25, "card": "5555555552", "terminal": "CDE"},
{"amount": 30, "card": "4444444442", "terminal": "EFG"},
{"amount": 10, "card": "1213212312", "terminal": "ABC"},
{"amount": 25, "card": "5555555552", "terminal": "CDE"},
{"amount": 30, "card": "4444444442", "terminal": "EFG"},
{"amount": 10, "card": "1213212312", "terminal": "ABC"},
{"amount": 25, "card": "5555555552", "terminal": "CDE"},
{"amount": 30, "card": "4444444442", "terminal": "EFG"},
{"amount": 10, "card": "1213212312", "terminal": "ABC"},
{"amount": 25, "card": "5555555552", "terminal": "CDE"},
{"amount": 30, "card": "4444444442", "terminal": "EFG"},
{"amount": 10, "card": "1213212312", "terminal": "ABC"},
{"amount": 25, "card": "5555555552", "terminal": "CDE"},
{"amount": 30, "card": "4444444442", "terminal": "EFG"},
{"amount": 10, "card": "1213212312", "terminal": "ABC"},
{"amount": 25, "card": "5555555552", "terminal": "CDE"},
{"amount": 30, "card": "4444444442", "terminal": "EFG"},
{"amount": 10, "card": "1213212312", "terminal": "ABC"},
{"amount": 25, "card": "5555555552", "terminal": "CDE"}
]
一般可以是任何json数组。这只是一个简短的例子。我用的是一个非常大的。
我相信我已经把拼图拼凑起来了。我必须检查 asyncio 的底层工作原理。我们必须意识到的第一件事是 KeyboardInterrupt(以及 SystemExit)从任务传播到更高级别。所以它将被
asyncio.run
周围的主 try-catch 块捕获
下一个难题是
asyncio.run
实现:
如果执行
run_until_complete
时出现异常,则所有任务都会通过_cancel_all_tasks(loop)
取消
现在是最后一块。
_cancel_all_tasks
:
现在我们有了
Task exception was never retrieved
消息的来源。
我找到了两种处理此问题的方法:
将捕获键盘中断的任务保存到全局变量并重新引发异常,以便将其传播到主 try-cath 块。所有任务都将被取消(将通过 asyncio.run 中的 _cancel_all_tasks 确保)。您唯一要做的就是从我们之前保存的任务中检索 KeyboardInterrupt 异常。
导入异步 导入 json
异常任务=无
async def conn_handler(reader, writer): 尝试: addr = writer.get_extra_info('peername') print(f"{addr} 已连接")
while True:
data_len = await reader.read(2)
data_len = int.from_bytes(data_len, byteorder="big")
if data_len == 0:
break
if data := await reader.readexactly(data_len):
msg = json.loads(data)
print(f"received: {msg}")
msg = json.dumps(msg)
msg_len = len(msg).to_bytes(2, byteorder="big")
writer.write(msg_len + msg.encode())
await writer.drain()
print(f"sent: {msg}")
print(f"{addr} closed")
except KeyboardInterrupt:
global exception_task
exception_task = asyncio.current_task()
raise
async def start_server(): 服务器 = 等待 asyncio.start_server(conn_handler, "0.0.0.0", 10999)
print(f'Serving on {server.sockets[0].getsockname()}')
async with server:
await server.serve_forever()
尝试: asyncio.run(启动服务器()) 除了键盘中断: 异常_任务.异常() print("发生键盘中断")
另一个选项是设置自定义异常处理程序。有关此内容的更多信息,请参见此处: https://superfastpython.com/asyncio-task-exception-was-never-retrieved/
大家请告诉我是否有另一种(更好的)方法来优雅地关闭异步套接字服务器。