我需要在Python中实现通过套接字进行通信的服务器和客户端。我决定用
asyncio.start_server
来实现服务器。客户端通过套接字连接到服务器。服务器随机生成一些消息并立即将这些消息推送到客户端。所有客户应尽快收到相同的消息并打印它们。
我的服务器实现:
import asyncio
import datetime as dt
import struct
import sys
from random import randrange
from zoneinfo import ZoneInfo
async def prepare_weather_data():
while True:
await asyncio.sleep(randrange(0, 7))
now = dt.datetime.now(ZoneInfo("Europe/Kyiv")).replace(microsecond=0).isoformat()
massage = f"{now} The temperature is {str(randrange(20, 30))} degrees Celsius"
message_encoded = massage.encode()
message_length = struct.pack('>I', len(message_encoded))
yield message_length + message_encoded
async def read_massage(reader: asyncio.StreamReader):
# Read the 4-byte length header
length_data = await reader.readexactly(4)
message_length = struct.unpack('>I', length_data)[0]
message_encoded = await reader.readexactly(message_length)
return message_encoded.decode()
async def handle_client(
_: asyncio.StreamReader,
writer: asyncio.StreamWriter,
weather_data: bytes,
):
try:
writer.write(weather_data)
await writer.drain()
except KeyboardInterrupt:
print("Closing the connection")
writer.close()
sys.exit(0)
async def main():
async def client_handler(reader, writer):
message = await read_massage(reader)
print(f"Received the message \"{message}\" that won't be handled any way")
async for weather_data in prepare_weather_data():
await handle_client(reader, writer, weather_data)
server = await asyncio.start_server(client_handler, "localhost", 8000)
async with server:
await server.serve_forever()
if __name__ == "__main__":
asyncio.run(main())
客户:
import sys
import socket
import struct
def recvall(sock: socket.socket, n: int) -> bytes | None:
# Helper function to recv n bytes or return None if EOF is hit
data = bytearray()
while len(data) < n:
packet = sock.recv(n - len(data))
if not packet:
return None
data.extend(packet)
return data
def read_massage(sock: socket.socket) -> str | None:
# Read the 4-byte length
message_length_encoded = recvall(sock, 4)
if not message_length_encoded:
print("Socket closed")
sys.exit(0)
message_length = struct.unpack('>I', message_length_encoded)[0]
return recvall(sock, message_length).decode()
so = socket.socket(
socket.AF_INET,
socket.SOCK_STREAM,
)
so.connect(("localhost", 8000)) # Blocking
print("Connected")
try:
message = "Hello, give me a weather, please".encode()
# Pack the length of the message into 4 bytes (big-endian)
message_length = struct.pack('>I', len(message))
sent = so.send(message_length + message) # Blocking
print(f"Sent {sent} bytes")
while True:
msg = read_massage(so)
print(msg)
except KeyboardInterrupt:
so.close()
print("Socket closed")
sys.exit(0)
这个版本的服务器基本可以运行。服务器生成消息并将其推送给客户端。然而,所有客户端都会收到不同的消息!如何修改服务器向所有客户端推送相同的消息?我已经尽力了。
UPD
我看到两种可能的方法。
将生成的数据存储在所有客户端都可以访问的某个容器中。 像这样:
async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter, weather_data):
_ = await read_message(reader)
try:
while True:
async for data in weather_data:
writer.write(data)
await writer.drain()
except KeyboardInterrupt:
print("Closing the connection")
writer.close()
sys.exit(0)
async def main():
weather_data = prepare_weather_data()
async def client_handler(reader, writer):
await handle_client(reader, writer, weather_data)
server = await asyncio.start_server(client_handler, "localhost", 8000)
async with server:
await server.serve_forever()
但是,第二个连接的客户端会崩溃,如下所示
asynchronous generator is already running
。
asyncio.all_tasks()
。不过我不知道怎么用。服务器部分可能会通过以下方式进行更改:
import asyncio
import datetime as dt
import struct
from random import randrange
from zoneinfo import ZoneInfo
writers = []
all_writers = {}
weather = b''
async def prepare_weather_data(t):
global weather, writers
try:
while True:
await asyncio.sleep(randrange(0, 7))
now = dt.datetime.now(ZoneInfo("Europe/Kyiv")).replace(microsecond=0).isoformat()
message = f"{now} The temperature is {str(randrange(20, 30))} degrees Celsius"
message_encoded = message.encode()
message_length = struct.pack('>I', len(message_encoded))
weather = message_length + message_encoded
writers = []
print(message)
except asyncio.CancelledError:
t.cancel()
async def read_message(reader: asyncio.StreamReader):
# Read the 4-byte length header
length_data = await reader.readexactly(4)
message_length = struct.unpack('>I', length_data)[0]
message_encoded = await reader.readexactly(message_length)
return message_encoded.decode()
async def handle_client(
writer: asyncio.StreamWriter,
):
socname = writer.get_extra_info('peername')
try:
while True:
# todo: refactor to avoid sleep
await asyncio.sleep(0)
if socname not in writers:
writers.append(socname)
writer.write(weather)
await writer.drain()
except* ConnectionResetError:
del all_writers[socname]
writer.close()
print(f'Socket {socname} closed')
except* asyncio.CancelledError:
pass
except* Exception as e:
print(f'ERROR: {str(e)}')
raise e
async def client_handler(reader, writer):
socname = writer.get_extra_info('peername')
all_writers[socname] = writer
message = await read_message(reader)
print(f"Socket {socname} connected. Received message: \"{message}\"")
await handle_client(writer)
async def run_server():
server = await asyncio.start_server(client_handler, "localhost", 8000)
print(f'Serving on {server.sockets[0].getsockname()}')
async with server:
try:
await server.serve_forever()
finally:
for writer in all_writers.values():
writer.close()
async def main():
try:
async with asyncio.TaskGroup() as group:
t = group.create_task(run_server())
group.create_task(prepare_weather_data(t))
except* asyncio.CancelledError:
print('Stopped by user')
if __name__ == "__main__":
asyncio.run(main())
这不是一个完美的解决方案,但它是一个可行的解决方案。从我的角度来看,如果
可能会有所改善await asyncio.sleep(0)
t.cancel()
显式取消任务;按照文档f'Socket {socname} closed'
我尝试改进脚本,但尚未找到有效的改进。我将不胜感激您的想法。