我当前运行一个流程以及一个 API。这两者需要进行通信,更具体地说是进程的 API。为此,我使用 Zeromq 的 Python 库。我的代码如下:
server.py
:
from zmq import POLLIN, ROUTER
from zmq.asyncio import Context, Poller
from zmq.auth.asyncio import AsyncioAuthenticator
async def start(self):
"""Starts the zmq server asyncronously and handles incoming requests"""
context = Context()
auth = AsyncioAuthenticator(context)
auth.start()
auth.configure_plain(domain="*", passwords={"user": IPC_TOKEN})
auth.allow("127.0.0.1")
socket = context.socket(ROUTER)
socket.plain_server = True
socket.bind("tcp://*:5555")
poller = Poller()
poller.register(socket, POLLIN)
while True:
socks = dict(await poller.poll())
if socket in socks and socks[socket] == POLLIN:
message = await socket.recv_multipart()
identity, request = message
decoded = loads(request.decode())
res = await getattr(self, decoded["route"])(decoded["data"])
if res:
await socket.send_multipart([identity, dumps(res).encode()])
else:
await socket.send_multipart([identity, b'{"status":"ok"}'])
client.py
:
from zmq import DEALER, POLLIN
from zmq.asyncio import Context, Poller
async def make_request(route: str, data: dict) -> dict:
context = Context.instance()
socket = context.socket(DEALER)
socket.identity = uuid.uuid4().hex.encode('utf-8')
socket.plain_username = b"user"
socket.plain_password = IPC_TOKEN.encode("UTF-8")
socket.connect("tcp://localhost:5555")
request = json.dumps({"route": route, "data": data}).encode('utf-8')
socket.send(request)
poller = Poller()
poller.register(socket, POLLIN)
while True:
events = dict(await poller.poll())
if socket in events and events[socket] == POLLIN:
multipart = json.loads((await socket.recv_multipart())[0].decode())
socket.close()
context.term()
return multipart
这对于前几个请求来说工作得很好,但是在一定数量的请求之后,此代码会默默地失败(没有错误),并且我没有得到 IPC 响应。当我将使用此代码作为客户端的 API 卷曲到我的第二个进程时,请求超时。
我认为这是因为进程没有正确关闭或堵塞,但我不知道如何解决这个问题。在此代码运行一段时间后停止工作后,我在服务器上运行了 lsof 命令:
如何防止这些连接在一段时间后超时?
使用
ROUTER
套接字,身份和实际消息之间有一个空帧:https://zguide.zeromq.org/docs/chapter3/#ROUTER-Broker-and-REQ-Workers
不确定为什么它会在前几次尝试中起作用,但我认为您需要更改代码才能执行以下操作:
identity, empty, request = message
...
if res:
await socket.send_multipart([identity, b'', dumps(res).encode()])
else:
await socket.send_multipart([identity, b'', b'{"status":"ok"}'])