ZMQError:在循环中使用套接字时地址已在使用中

问题描述 投票:0回答:1

我正在尝试使用Python中的Zero MQ“模拟分布式环境中6个节点之间的消息传递”,特别是使用带有REQREP的经典客户端/服务器架构。我的想法是,在这些节点之间使用TCP/IP连接时,在第一次迭代中,node-1 必须是服务器,客户端是其他节点。在下一个中,node-2 将是服务器,其余的(包括 node-1)应该是客户端,依此类推。在每次迭代中,服务器都会告知它已经建立了自身,并且客户端向服务器发送请求,服务器会向服务器发回确认。一旦收到 ACK,客户端就会将其“MESSAGE”发送到服务器(这当然被视为输出),然后我们进入下一个迭代。
现在的问题是我面临着众所周知的

ZMQError: Address already in use

我不确定这是否是由于套接字绑定造成的。我在客户端和服务器功能中添加了
socket.close()
context.term()
,但没有成功。
当我以某种方式尝试运行代码时,虚拟机陷入死锁,除非执行硬重启,否则我无法恢复。这是我的代码片段 -

@staticmethod
def server(node_id):
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.bind("tcp://*:%s" % port_s)
    print "Running server node %s on port: %s and value of server = __temp__" % (node_id, port_s)
    message = socket.recv()
    print "Received request : %s from c_node %s and value (temp): __value__" % (message, c_node)
    socket.send("Acknowledged - from %s" % port_s)
    time.sleep(1)
    socket.close()
    context.term() 

@staticmethod    
def client(c_node):

    context = zmq.Context()
#  print "Server node __num__ with port %s" % port_s
    socket = context.socket(zmq.REQ)
  #for port in ports:
    socket.connect ("tcp://localhost:%s" % port_c)
#for request in range(20):
    print "c_node %s Sending request to server node __num__" % c_node
    socket.send ("Hello")
    message = socket.recv()
    print "Received ack from server %s and message %s" % (node_id, message)
    time.sleep (1)
    socket.close()
    context.term() 


def node(self, node_id):
    #global node_id
   # global key
#    ser_p = Process(target=self.server, args=(node_id,))


    print 'Memory content of node %d\n' % node_id
    for key in nodes_memory[node_id]:
        print 'Neighbor={%s}, Temp={%s}\n' % (key, nodes_memory[node_id][key])
        #return key
    global c_node
    #key1 = key
#    cli_p = Process(target=self.client, args=(c_node,))


    with open("Book5.csv","r+b") as input:

        has_header = csv.Sniffer().has_header(input.read(1024))
        input.seek(0)  # rewind
        incsv = csv.reader(input)
        if has_header:
            next(incsv)  # skip header

        csv_dict = csv.DictReader(input, skipinitialspace=True, delimiter=",")
        node_id = 0
        for row in csv_dict:
            for i in row:
                #print(row[i])
                if type(row[i]) is str:
                    g.add_edge(node_id, int(i), conn_prob=(float(row[i]))) 
            max_wg_ngs = sorted(g[node_id].items(), key=lambda e: e[1]["conn_prob"], reverse=True)[:2]
            #maxim = max_wg_ngs.values.tolist()
            #sarr = [str(a) for a in max_wg_ngs]
            print "\nNeighbours of Node %d are:" % node_id       
            #print(max_wg_ngs)
            ser_p = multiprocessing.Process(target=self.server, args=(node_id,))

            ser_p.start()

            for c_node, data in max_wg_ngs:
                for key in nodes_memory[node_id]:    #print ''.join(str(item))[1:-1]
                    #if type(key1) == node_id:
                    cli_p = multiprocessing.Process(target=self.client, args=(c_node,))
                    cli_p.start()

                    print('Node {a} with Connection Rate = {w}'.format(a=c_node, w=data['conn_prob']))

                    print('Temperature of Node {a} = {b}'.format(a=c_node, b=nodes_memory[node_id][key]))

            node_id += 1

    pos=nx.spring_layout(g, scale=100.)
    nx.draw_networkx_nodes(g, pos)
    nx.draw_networkx_edges(g,pos)
    nx.draw_networkx_labels(g,pos)
    #plt.axis('off')
    #plt.show()

“消息”是“温度”(文件未显示在代码片段中,但目前不需要),作为参考,

Book5.csv
的值是 -
0,1,2,3,4,5
0,0.257905291,0.775104118,0.239086843,0.002313744,0.416936603
0.346100279,0,0.438892758,0.598885794,0.002263231,0.406685237
0.753358102,0.222349243,0,0.407830809,0.001714776,0.507573592
0.185342928,0.571302688,0.51784403,0,0.003231018,0.295197533
0,0,0,0,0,0
0.478164621,0.418192795,0.646810223,0.410746629,0.002414973,0

ser_p
cli_p
是在
node
函数中调用的服务器和客户端函数的对象,即
ser_p
在循环
for row in csv_dict
中调用,
cli_p
for c_node, data in max_wg_ngs
中进一步调用。我在这里也使用 Networkx Python 库(仅使用
Book5.csv
中的连接概率值从客户端中找到 2 个最近的邻居)。
有谁知道我可能哪里出错了?为什么它显示地址已在使用中,即使套接字在每次迭代时都关闭?
非常感谢:)(使用 Ubuntu 14.04 32 位虚拟机)

python sockets zeromq networkx pyzmq
1个回答
1
投票

请注意,

port_c
port_s
在词汇上都是正确的(从未在其他地方定义过)。

分析逻辑。

如果多个进程尝试

.bind()
到通过
port_s
设置的同一端口#,它们只会(并且必须)发生冲突并陷入
ZMQError: Address already in use
异常。首先重新启动操作系统,接下来预扫描已使用的
IP:port#
-s,接下来设置非冲突服务器端
.bind()
(可能仍然存在挂起的
.context()
且未终止
.socket()
实例) (s)(通常来自手动原型设计或未处理的异常),保留
IP:port#
而不使其空闲,因此重新启动 + 端口扫描是一种进步。

使用任何确定性的、主要是非冲突的 server-2-

<transport-class://address:port>
映射(使用通配符的
.bind()
-s,锁定所有 IP 地址,是一个有点危险的习惯),您的代码将顺利运行。

始终使用

<socket>.setsockopt( zmq.LINGER, 0 )
以防止无限死锁。

始终使用

try: {...} except: {...} finally: {...}
正式表达式,以避免任何未处理的异常,从而孤立任何超出你控制范围的
.context()
实例,也许没有优雅的 .term() 启动和释放(即使新的 API 告诉你,这没有必要 - 明确处理这些情况并保持控制是专业的,所以没有例外,没有借口)。

© www.soinside.com 2019 - 2024. All rights reserved.