如何在键盘中断时实现异步套接字服务器的正常关闭?

问题描述 投票:0回答:1

我有以下服务器代码:

import asyncio
import json

async def conn_handler(reader, writer):

            addr = writer.get_extra_info('peername')
            print(f"{addr} connected")

            while True:
                    data_len = await reader.read(2)
                    data_len = int.from_bytes(data_len, byteorder="big")

                    if data_len == 0:
                        break

                    if data := await reader.readexactly(data_len):
                        msg = json.loads(data)
                        print(f"received: {msg}")
                        
                        msg = json.dumps(msg)                        
                        msg_len = len(msg).to_bytes(2, byteorder="big")

                        writer.write(msg_len + msg.encode())
                        await writer.drain()
                        print(f"sent: {msg}")

            print(f"{addr} closed")


async def start_server():
    server = await asyncio.start_server(conn_handler, "0.0.0.0", 10999)

    print(f'Serving on {server.sockets[0].getsockname()}')


    async with server:
        await server.serve_forever()

try:
    asyncio.run(start_server())
except KeyboardInterrupt:
    print("keyboard interrupt occured")

以及以下客户端代码:

import asyncio
import json
import logging
import pprint
import uuid


outstanding_msgs = []
msgs_count = 0

async def send_msgs_loop(writer):
    with open("msgs.json") as f:
        msgs = json.load(f)

        global msgs_count
        msgs_count = len(msgs)

        for msg in msgs:

            msg["id"] = str(uuid.uuid4())
            outstanding_msgs.append(msg["id"])

            msg = json.dumps(msg)

            msg_len = len(msg)
            msg_len = msg_len.to_bytes(2, byteorder="big")

            writer.write(msg_len + msg.encode())
            await writer.drain()

            print(f"sent: {msg}")

async def read_msgs_loop(reader):
    received_count = 0
    while True:
        msg_len = await reader.read(2)
        msg_len = int.from_bytes(msg_len, byteorder="big")

        if msg_len == 0:
            break

        if msg := await reader.readexactly(msg_len):
            msg = json.loads(msg)
            if msg["id"] in outstanding_msgs:
                received_count = received_count + 1
                outstanding_msgs.remove(msg["id"])
                print(f"received: {msg}")

        if not outstanding_msgs and received_count == msgs_count:
            print("all responses received")
            break

async def start_client():
    reader, writer = await asyncio.open_connection("localhost", 10999)

    await asyncio.gather(send_msgs_loop(writer), read_msgs_loop(reader))

    writer.close()
    await writer.wait_closed()

if __name__ == '__main__':    
    asyncio.run(start_client(), debug=False)

我需要一些指导,如何在键盘中断时在服务器上实现正常关闭。

当没有连接的客户端时,尝试 except on

asyncio.run
工作正常。但是,如果有连接的客户端,我会在 conn_handler 中遇到异常。通常,它看起来像这样:

future: <Task finished name='Task-5' coro=<conn_handler() done, defined at C:\Users\tsku1460\myPython\pycon_2024\async_server.py:4> exception=KeyboardInte
rrupt()>
Traceback (most recent call last):
  File "C:\Users\tsku1460\myPython\pycon_2024\async_server.py", line 40, in <module>
    asyncio.run(start_server())
  File "C:\Program Files\Python310\lib\asyncio\runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "C:\Program Files\Python310\lib\asyncio\base_events.py", line 636, in run_until_complete
    self.run_forever()
  File "C:\Program Files\Python310\lib\asyncio\windows_events.py", line 321, in run_forever
    super().run_forever()
  File "C:\Program Files\Python310\lib\asyncio\base_events.py", line 603, in run_forever
    self._run_once()
  File "C:\Program Files\Python310\lib\asyncio\base_events.py", line 1909, in _run_once
    handle._run()
  File "C:\Program Files\Python310\lib\asyncio\events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "C:\Users\tsku1460\myPython\pycon_2024\async_server.py", line 25, in conn_handler
    print(f"sent: {msg}")
KeyboardInterrupt

如果我将 try-except 放在

conn_handler
中,那么我需要按 CTRL+C 两次(一次被 conn_handler 捕获,一次被 asyncio.run 周围的主 try-catch 块捕获)。如果我像这样从 conn_handler 传播异常:

   except KeyboardInterrupt as e:
        raise

然后我明白了:

keyboard interrupt occured
Task exception was never retrieved
future: <Task finished name='Task-5' coro=<conn_handler() done, defined at C:\Users\tsku1460\myPython\pycon_2024\async_server.py:4> exception=KeyboardInte
rrupt()>
Traceback (most recent call last):
  File "C:\Users\tsku1460\myPython\pycon_2024\async_server.py", line 41, in <module>
    asyncio.run(start_server())
  File "C:\Program Files\Python310\lib\asyncio\runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "C:\Program Files\Python310\lib\asyncio\base_events.py", line 636, in run_until_complete
    self.run_forever()
  File "C:\Program Files\Python310\lib\asyncio\windows_events.py", line 321, in run_forever
    super().run_forever()
  File "C:\Program Files\Python310\lib\asyncio\base_events.py", line 603, in run_forever
    self._run_once()
  File "C:\Program Files\Python310\lib\asyncio\base_events.py", line 1909, in _run_once
    handle._run()
  File "C:\Program Files\Python310\lib\asyncio\events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "C:\Users\tsku1460\myPython\pycon_2024\async_server.py", line 18, in conn_handler
    print(f"received: {msg}")
KeyboardInterrupt

因此异常被传播到主 try-catch 块(我们可以看到文本“发生键盘中断”),但仍然有问题。

这是 msgs.json 的示例内容:

[
        {"amount": 10, "card": "1213212312", "terminal": "ABC"},
        {"amount": 25, "card": "5555555552", "terminal": "CDE"},
        {"amount": 30, "card": "4444444442", "terminal": "EFG"},
        {"amount": 10, "card": "1213212312", "terminal": "ABC"},
        {"amount": 25, "card": "5555555552", "terminal": "CDE"},
        {"amount": 30, "card": "4444444442", "terminal": "EFG"},
        {"amount": 10, "card": "1213212312", "terminal": "ABC"},
        {"amount": 25, "card": "5555555552", "terminal": "CDE"},
        {"amount": 30, "card": "4444444442", "terminal": "EFG"},
        {"amount": 10, "card": "1213212312", "terminal": "ABC"},
        {"amount": 25, "card": "5555555552", "terminal": "CDE"},
        {"amount": 30, "card": "4444444442", "terminal": "EFG"},
        {"amount": 10, "card": "1213212312", "terminal": "ABC"},
        {"amount": 25, "card": "5555555552", "terminal": "CDE"},
        {"amount": 30, "card": "4444444442", "terminal": "EFG"},
        {"amount": 10, "card": "1213212312", "terminal": "ABC"},
        {"amount": 25, "card": "5555555552", "terminal": "CDE"},
        {"amount": 30, "card": "4444444442", "terminal": "EFG"},
        {"amount": 10, "card": "1213212312", "terminal": "ABC"},
        {"amount": 25, "card": "5555555552", "terminal": "CDE"},
        {"amount": 30, "card": "4444444442", "terminal": "EFG"},
        {"amount": 10, "card": "1213212312", "terminal": "ABC"},
        {"amount": 25, "card": "5555555552", "terminal": "CDE"}
]

一般可以是任何json数组。这只是一个简短的例子。我用的是一个非常大的。

python try-catch python-asyncio keyboardinterrupt
1个回答
0
投票

我相信我已经把拼图拼凑起来了。我必须检查 asyncio 的底层工作原理。我们必须意识到的第一件事是 KeyboardInterrupt(以及 SystemExit)从任务传播到更高级别。所以它将被

asyncio.run
周围的主 try-catch 块捕获

下一个难题是

asyncio.run
实现:

如果执行

run_until_complete
时出现异常,则所有任务都会通过
_cancel_all_tasks(loop)

取消

现在是最后一块。

_cancel_all_tasks

现在我们有了

Task exception was never retrieved
消息的来源。

我找到了两种处理此问题的方法:

  • 将捕获键盘中断的任务保存到全局变量并重新引发异常,以便将其传播到主 try-cath 块。所有任务都将被取消(将通过 asyncio.run 中的 _cancel_all_tasks 确保)。您唯一要做的就是从我们之前保存的任务中检索 KeyboardInterrupt 异常。

    导入异步 导入 json

    异常任务=无

    async def conn_handler(reader, writer): 尝试: addr = writer.get_extra_info('peername') print(f"{addr} 已连接")

          while True:
              data_len = await reader.read(2)
              data_len = int.from_bytes(data_len, byteorder="big")
    
              if data_len == 0:
                  break
    
              if data := await reader.readexactly(data_len):
                  msg = json.loads(data)
                  print(f"received: {msg}")
    
                  msg = json.dumps(msg)
                  msg_len = len(msg).to_bytes(2, byteorder="big")
    
                  writer.write(msg_len + msg.encode())
                  await writer.drain()
                  print(f"sent: {msg}")
    
          print(f"{addr} closed")
      except KeyboardInterrupt:
          global exception_task
          exception_task = asyncio.current_task()
          raise
    

    async def start_server(): 服务器 = 等待 asyncio.start_server(conn_handler, "0.0.0.0", 10999)

      print(f'Serving on {server.sockets[0].getsockname()}')
    
      async with server:
          await server.serve_forever()
    

    尝试: asyncio.run(启动服务器()) 除了键盘中断: 异常_任务.异常() print("发生键盘中断")

  • 另一个选项是设置自定义异常处理程序。有关此内容的更多信息,请参见此处: https://superfastpython.com/asyncio-task-exception-was-never-retrieved/

大家请告诉我是否有另一种(更好的)方法来优雅地关闭异步套接字服务器。

© www.soinside.com 2019 - 2024. All rights reserved.