我有一个使用 asyncio 的基于 python 的 TCP 服务器,但它有一个特定的行为:TCP 客户端(PLC)连接到应用程序,但实际上服务器或客户端都可以启动数据流。一旦数据流开始,将通过专门设计的ACK消息完成。
TCP服务器部分连接一个RMQ队列;对于每条新消息,它将启动与 PLC 的数据流,等待响应,然后确认双方并完成数据流。在PLC端,可以随时启动一个流程(但是如果有正在进行的流程,就会被拒绝),然后TCP服务器会做一些处理,然后发送响应,PLC发送ACK,TCP服务器响应 ACK,则流程结束。
连接始终保持畅通。
我已经使用异步服务器和协议实现了这一点,但我发现代码过于复杂,而且我不喜欢它添加新功能或增加功能的糟糕程度。
我认为我可以使用流 API 来代替,这看起来更清晰,但我无法想象是否有一种方法可以通过阅读器处理客户端发起的请求,或者启动对客户端的服务器请求。我只是想知道这是否可能?
因此,在检查了一些可能的选项后,我有了这段代码,可以检查它是否正常工作。使用 PLC 模拟器连接到 TCP 端口,我可以启动从 PC 到 PLC 的数据流,反之亦然。
class PLCRing:
def __init__(self, port: int, host: str):
self._port = port
self._host = host
self._srv = None
self._queue = asyncio.Queue()
self._processing = asyncio.Event()
self._stopping = asyncio.Event()
async def start(self):
self._srv = await asyncio.start_server(self._process, self._host, self._port)
# Connect listener to rabbitmq and start processing, which will push
# each new message into our internal queue.
await self._messaging.connect()
sub = asyncio.create_task(self._messaging.process(self._queue)
await sub
await self._srv.serve_forever()
async def _process(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
print(f"Client connection from {writer.transport.get_extra_info("peername")}")
try:
while True:
# if we are stopping, no more processing is needed
if self._stopping.is_set():
break
# This could be not necessary, since each loop should be self-contained.
if self._processing.is_set():
# avoid doing nothing for the time being, we are still
# processing something
continue
# First doubt, I could not find a better way to check if the PLC started a communication than by checking if we can read data
try:
buff = await asyncio.wait_for(reader.read(5232), 0.5)
await self._process_msg_from_plc(buff, reader, writer)
except asyncio.TimeoutError:
# Check if we have something in the queue to process and send
if self._queue.empty():
continue
msg = await self._queue.get()
await self._process_msg_to_plc(msg, reader, writer)
self._queue.task_done()
except asyncio.IncompleteReadError:
# The PLC might have sent small amount of data until EOF
logger.warning("unexpected incomplete read")
pass
except asyncio.InvalidStateError:
# Seems the tcp connection state gone wrong
pass
except Exception as e:
logger.error(f"Something went wrong: {e}")
pass
finally:
writer.close()
await writer.wait_close()
使用上面的代码,我可以使用
writer
和 reader
以我期望的方式在两个流程中进行交互。
如上所述,我的主要疑问在于如何以正确的方式检查是否有任何待处理的数据。另外,通过使用
asyncio.wait_for
500ms 超时,我想我可以确保它不会阻塞或无休止地等待,但这也可能是一个更好的方法?