ZMQ/0MQ如何拥有多个发布者和订阅者?

问题描述 投票:0回答:2

如何创建一个允许多个发布者和这些发布者的多个订阅者的网络?

或者是否绝对需要使用消息代理?

import time
import zmq
from multiprocessing import Process

def bind_pub(sleep_seconds, max_messages, pub_id):
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://*:5556")

    message = 0
    while True:
        socket.send_string("1 sending_func=bind_pub message_number=%s pub_id=%s" % (message, pub_id))
        message += 1
        if message >= max_messages:
            break
        time.sleep(sleep_seconds)

def bind_sub(sleep_seconds, max_messages, sub_id):
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.bind("tcp://*:5556")
    socket.setsockopt_string(zmq.SUBSCRIBE, '1')

    message_n = 0
    while True:
        message = socket.recv_string()
        print(message + " receiving_func=bind_sub sub_id=%s" % sub_id)
        message_n += 1
        if message_n >= max_messages - 1:
            break
        time.sleep(sleep_seconds)

def conect_pub(sleep_seconds, max_messages, pub_id):
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.connect("tcp://localhost:5556")

    message = 0
    while True:
        socket.send_string("1 sending_func=conect_pub message_number=%s pub_id=%s" % (message, pub_id))
        message += 1
        if message >= max_messages:
            break
        time.sleep(sleep_seconds)

def connect_sub(sleep_seconds, max_messages, sub_id):
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.connect("tcp://localhost:5556")
    socket.setsockopt_string(zmq.SUBSCRIBE, '1')

    message_n = 0
    while True:
        message = socket.recv_string()
        print(message + " receiving_func=connect_sub sub_id=%s" % sub_id)
        message_n += 1
        if message_n >= max_messages - 1:
            break
        time.sleep(sleep_seconds)

尝试bind_pub、connect_pub、connect_sub、connect_sub网络架构时:

# bind_pub, connect_pub, connect_sub, connect_sub
n_messages = 4
p1 = Process(target=bind_pub, args=(1,n_messages,1))
p2 = Process(target=conect_pub, args=(1,n_messages,2))
p3 = Process(target=connect_sub, args=(0.1,n_messages,1))
p4 = Process(target=connect_sub, args=(0.1,n_messages,2))
p1.start()
p2.start()
p3.start()
p4.start()
p1.join()
p2.join()
p3.join()
p4.join()

导致

pub_id=2
消息丢失:

1 sending_func=bind_pub message_number=1 pub_id=1 receiving_func=connect_sub sub_id=2
1 sending_func=bind_pub message_number=1 pub_id=1 receiving_func=connect_sub sub_id=1
1 sending_func=bind_pub message_number=2 pub_id=1 receiving_func=connect_sub sub_id=2
1 sending_func=bind_pub message_number=2 pub_id=1 receiving_func=connect_sub sub_id=1
1 sending_func=bind_pub message_number=3 pub_id=1 receiving_func=connect_sub sub_id=1
1 sending_func=bind_pub message_number=3 pub_id=1 receiving_func=connect_sub sub_id=2

类似地运行 connect_pub、connect_pub、connect_sub、bind_sub 架构:

# connect_pub, connect_pub, connect_sub, bind_sub
n_messages = 4
p1 = Process(target=conect_pub, args=(1,n_messages,1))
p2 = Process(target=conect_pub, args=(1,n_messages,2))
p3 = Process(target=bind_sub, args=(0.1,n_messages,1))
p4 = Process(target=connect_sub, args=(0.1,n_messages,2))
p1.start()
p2.start()
p3.start()
p4.start()
p1.join()
p2.join()
p3.join()
p4.join()

导致

sub_id=2
没有收到消息:

1 sending_func=conect_pub message_number=1 pub_id=1 receiving_func=bind_sub sub_id=1
1 sending_func=conect_pub message_number=1 pub_id=2 receiving_func=bind_sub sub_id=1
1 sending_func=conect_pub message_number=2 pub_id=1 receiving_func=bind_sub sub_id=1
zeromq distributed-system
2个回答
1
投票

当然没有必要使用代理来实现多对多网络,但是代理确实简化了配置,因为每个节点只需要知道代理的地址,而不需要知道其所有对等点。

另一种可能性是混合方法——使用代理在对等点之间交换地址信息,以便它们可以直接相互连接。您可以在此处找到示例:https://github.com/nyfix/OZ/blob/master/doc/Naming-Service.md


1
投票

好吧,
公平地说ZeroMQ主要是一个无代理框架,

这意味着第二个问题已经得到了先验的解决 - 不,它不仅不是绝对必要,而且原则上也是不可能的(如果一个人不实现Broker-(半)持久性作为禅宗- of-Zero 标准 ZeroMQ 工具基础层是一个额外的附加组件)。


接下来,
ZeroMQ 工具到目前为止还不是您所知道的“socket”:

这是一个经常被重新阐明的误解,所以让我以粗体重复一遍。

注意:
ZeroMQ

Socket()
实例不是您所知道的 TCP 套接字。最好在不到五秒的时间内阅读 ZeroMQ 层次结构中的主要概念差异 或其他帖子和讨论。


然而,
更重要的是,
似乎没有未涵盖的明确需求:

ZeroMQ 可以服务所有:

many-PUB-s : many-SUB-s           -or-  
 one-PUB   : many-SUB-s           -or- even  
many-PUB-s :  one-SUB

这些“many”中的全部或部分仍然可以被

.connect()
连接到单个或多个接入点,因此生成的拓扑可能会变得非常疯狂(有关详细信息,请检查上面提供的链接到a “五秒”读作)所以,自己的想象力似乎是这样做的唯一上限。

© www.soinside.com 2019 - 2024. All rights reserved.