在Python中实现服务器和客户端通过套接字通信

问题描述 投票:0回答:1

我需要在Python中实现通过套接字进行通信的服务器和客户端。我决定用

asyncio.start_server
来实现服务器。客户端通过套接字连接到服务器。服务器随机生成一些消息并立即将这些消息推送到客户端。所有客户应尽快收到相同的消息并打印它们。

我的服务器实现:

import asyncio
import datetime as dt
import struct
import sys

from random import randrange
from zoneinfo import ZoneInfo


async def prepare_weather_data():
    while True:
        await asyncio.sleep(randrange(0, 7))

        now = dt.datetime.now(ZoneInfo("Europe/Kyiv")).replace(microsecond=0).isoformat()
        massage = f"{now} The temperature is {str(randrange(20, 30))} degrees Celsius"

        message_encoded = massage.encode()
        message_length = struct.pack('>I', len(message_encoded))

        yield message_length + message_encoded


async def read_massage(reader: asyncio.StreamReader):
    # Read the 4-byte length header
    length_data = await reader.readexactly(4)
    message_length = struct.unpack('>I', length_data)[0]
    message_encoded = await reader.readexactly(message_length)
    return message_encoded.decode()


async def handle_client(
    _: asyncio.StreamReader,
    writer: asyncio.StreamWriter,
    weather_data: bytes,
):
    try:
        writer.write(weather_data)
        await writer.drain()
    except KeyboardInterrupt:
        print("Closing the connection")
        writer.close()
        sys.exit(0)


async def main():
    async def client_handler(reader, writer):
        message = await read_massage(reader)
        print(f"Received the message \"{message}\" that won't be handled any way")

        async for weather_data in prepare_weather_data():
            await handle_client(reader, writer, weather_data)

    server = await asyncio.start_server(client_handler, "localhost", 8000)
    async with server:
        await server.serve_forever()


if __name__ == "__main__":
    asyncio.run(main())

客户:

import sys
import socket
import struct


def recvall(sock: socket.socket, n: int) -> bytes | None:
    # Helper function to recv n bytes or return None if EOF is hit
    data = bytearray()
    while len(data) < n:
        packet = sock.recv(n - len(data))
        if not packet:
            return None
        data.extend(packet)
    return data


def read_massage(sock: socket.socket) -> str | None:
    # Read the 4-byte length
    message_length_encoded = recvall(sock, 4)
    if not message_length_encoded:
        print("Socket closed")
        sys.exit(0)

    message_length = struct.unpack('>I', message_length_encoded)[0]
    return recvall(sock, message_length).decode()


so = socket.socket(
    socket.AF_INET,
    socket.SOCK_STREAM,
)

so.connect(("localhost", 8000))  # Blocking
print("Connected")

try:
    message = "Hello, give me a weather, please".encode()

    # Pack the length of the message into 4 bytes (big-endian)
    message_length = struct.pack('>I', len(message))
    sent = so.send(message_length + message)  # Blocking
    print(f"Sent {sent} bytes")

    while True:
        msg = read_massage(so)
        print(msg)

except KeyboardInterrupt:
    so.close()
    print("Socket closed")
    sys.exit(0)

这个版本的服务器基本可以运行。服务器生成消息并将其推送给客户端。然而,所有客户端都会收到不同的消息!如何修改服务器向所有客户端推送相同的消息?我已经尽力了。

UPD

我看到两种可能的方法。

  1. 将生成的数据存储在所有客户端都可以访问的某个容器中。 像这样:

     async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter, weather_data):
         _ = await read_message(reader)
         try:
             while True:
                 async for data in weather_data:
                     writer.write(data)
                 await writer.drain()
         except KeyboardInterrupt:
             print("Closing the connection")
             writer.close()
             sys.exit(0)
    
     async def main():
         weather_data = prepare_weather_data()
    
         async def client_handler(reader, writer):
             await handle_client(reader, writer, weather_data)
    
         server = await asyncio.start_server(client_handler, "localhost", 8000)
         async with server:
             await server.serve_forever()
    

但是,第二个连接的客户端会崩溃,如下所示

asynchronous generator is already running

  1. 注册所有连接的客户端并将生成的数据推送给它们。我尝试将所有连接的客户端设置为
    asyncio.all_tasks()
    。不过我不知道怎么用。
python sockets asynchronous python-asyncio
1个回答
0
投票

服务器部分可能会通过以下方式进行更改:

import asyncio
import datetime as dt
import struct

from random import randrange
from zoneinfo import ZoneInfo

writers = []
all_writers = {}
weather = b''


async def prepare_weather_data(t):
    global weather, writers
    try:
        while True:
            await asyncio.sleep(randrange(0, 7))
            now = dt.datetime.now(ZoneInfo("Europe/Kyiv")).replace(microsecond=0).isoformat()
            message = f"{now} The temperature is {str(randrange(20, 30))} degrees Celsius"

            message_encoded = message.encode()
            message_length = struct.pack('>I', len(message_encoded))

            weather = message_length + message_encoded

            writers = []
            print(message)
    except asyncio.CancelledError:
        t.cancel()


async def read_message(reader: asyncio.StreamReader):
    # Read the 4-byte length header
    length_data = await reader.readexactly(4)
    message_length = struct.unpack('>I', length_data)[0]
    message_encoded = await reader.readexactly(message_length)
    return message_encoded.decode()


async def handle_client(
    writer: asyncio.StreamWriter,
):
    socname = writer.get_extra_info('peername')
    try:
        while True:
            # todo: refactor to avoid sleep
            await asyncio.sleep(0)
            if socname not in writers:
                writers.append(socname)
                writer.write(weather)
                await writer.drain()
    except* ConnectionResetError:
        del all_writers[socname]
        writer.close()
        print(f'Socket {socname} closed')
    except* asyncio.CancelledError:
        pass
    except* Exception as e:
        print(f'ERROR: {str(e)}')
        raise e


async def client_handler(reader, writer):
    socname = writer.get_extra_info('peername')
    all_writers[socname] = writer
    message = await read_message(reader)
    print(f"Socket {socname} connected. Received message: \"{message}\"")
    await handle_client(writer)


async def run_server():
    server = await asyncio.start_server(client_handler, "localhost", 8000)
    print(f'Serving on {server.sockets[0].getsockname()}')
    async with server:
        try:
            await server.serve_forever()
        finally:
            for writer in all_writers.values():
                writer.close()

async def main():
    try:
        async with asyncio.TaskGroup() as group:
            t = group.create_task(run_server())
            group.create_task(prepare_weather_data(t))
    except* asyncio.CancelledError:
        print('Stopped by user')


if __name__ == "__main__":
    asyncio.run(main())

这不是一个完美的解决方案,但它是一个可行的解决方案。从我的角度来看,如果

可能会有所改善
  1. 避免协程切换
    await asyncio.sleep(0)
  2. 尽可能避免使用全局变量
  3. 避免使用
    t.cancel()
    显式取消任务;按照文档
  4. 中的建议添加异常引发任务
  5. 消除服务器端通知客户端断开连接时约 2 秒的延迟;即打印延迟
    f'Socket {socname} closed'

我尝试改进脚本,但尚未找到有效的改进。我将不胜感激您的想法。

最新问题
© www.soinside.com 2019 - 2024. All rights reserved.