我正在尝试使用 XPUB-XSUB 代理通过 ZeroMQ 创建消息总线。
订阅者和发布者的数量会随着他们的来去而变化。目标是可能有许多守护进程可以通过此消息总线进行通信。
问题是,当已经有订阅者连接到代理的 XPUB 端并且新的发布者连接并立即开始发送消息时,发布者将不会收到第一条消息。
我认为问题在于,当发布者连接时,有关订阅者的信息不会立即到达,并且第一条消息会在套接字上被丢弃。
简单但不可靠的解决方案是在连接和发送消息之间在发布者端添加少量睡眠。
有什么好的方法等待订阅转发吗?或者我应该使用其他类型的套接字?
我有以下例子:
import threading
import time
from zmq import Context, Socket, proxy
from zmq.constants import PUB, SUB, XPUB, XPUB_VERBOSE, XSUB
def message_bus():
context = Context.instance()
in_socket: Socket = context.socket(XSUB)
in_socket.bind("ipc:///tmp/in_socket.ipc")
out_socket: Socket = context.socket(XPUB)
out_socket.bind("ipc:///tmp/out_socket.ipc")
out_socket.setsockopt(XPUB_VERBOSE, True)
proxy(in_socket, out_socket)
def publisher():
context = Context.instance()
bus_in_socket: Socket = context.socket(PUB)
bus_in_socket.connect("ipc:///tmp/in_socket.ipc")
count = 1
while True:
bus_in_socket.send_string(f"message number {count}")
count += 1
time.sleep(0.5)
def subscriber():
context = Context.instance()
bus_out_socket: Socket = context.socket(SUB)
bus_out_socket.connect("ipc:///tmp/out_socket.ipc")
bus_out_socket.subscribe("")
while True:
print(f"subscriber {bus_out_socket.recv_multipart()}")
if __name__ == "__main__":
message_bus_thread = threading.Thread(target=message_bus, daemon=True)
subscriber_thread = threading.Thread(target=subscriber, daemon=True)
publisher_thread = threading.Thread(target=publisher, daemon=True)
message_bus_thread.start()
subscriber_thread.start()
time.sleep(1)
publisher_thread.start()
message_bus_thread.join()
subscriber_thread.join()
publisher_thread.join()
输出:
subscriber [b'message number 2']
subscriber [b'message number 3']
subscriber [b'message number 4']
subscriber [b'message number 5']
subscriber [b'message number 6']
subscriber [b'message number 7']
subscriber ...
正如您所看到的,第一条消息
[b'message number 1']
根本没有发送。
对此的一种解决方案是使用 PUSH-PULL 套接字而不是 PUB-XSUB。
import threading
import time
from zmq import Context, Socket, proxy
from zmq.constants import PUB, PULL, PUSH, SUB
def message_bus():
context = Context.instance()
in_socket: Socket = context.socket(PULL)
in_socket.bind("ipc:///tmp/in_socket.ipc")
out_socket: Socket = context.socket(PUB)
out_socket.bind("ipc:///tmp/out_socket.ipc")
proxy(in_socket, out_socket)
def publisher():
context = Context.instance()
bus_in_socket: Socket = context.socket(PUSH)
bus_in_socket.connect("ipc:///tmp/in_socket.ipc")
count = 1
while True:
bus_in_socket.send_string(f"message number {count}")
count += 1
time.sleep(1)
def subscriber():
context = Context.instance()
bus_out_socket: Socket = context.socket(SUB)
bus_out_socket.connect("ipc:///tmp/out_socket.ipc")
bus_out_socket.subscribe("")
while True:
print(f"subscriber {bus_out_socket.recv_multipart()}")
if __name__ == "__main__":
message_bus_thread = threading.Thread(target=message_bus, daemon=True)
subscriber_thread = threading.Thread(target=subscriber, daemon=True)
publisher_thread = threading.Thread(target=publisher, daemon=True)
message_bus_thread.start()
subscriber_thread.start()
time.sleep(1)
publisher_thread.start()
message_bus_thread.join()
subscriber_thread.join()
publisher_thread.join()