我正在尝试使用Python中的Zero MQ“模拟分布式环境中6个节点之间的消息传递”,特别是使用带有REQ和REP的经典客户端/服务器架构。我的想法是,在这些节点之间使用TCP/IP连接时,在第一次迭代中,node-1 必须是服务器,客户端是其他节点。在下一个中,node-2 将是服务器,其余的(包括 node-1)应该是客户端,依此类推。在每次迭代中,服务器都会告知它已经建立了自身,并且客户端向服务器发送请求,服务器会向服务器发回确认。一旦收到 ACK,客户端就会将其“MESSAGE”发送到服务器(这当然被视为输出),然后我们进入下一个迭代。
现在的问题是我面临着众所周知的
ZMQError: Address already in use
socket.close()
和 context.term()
,但没有成功。@staticmethod
def server(node_id):
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:%s" % port_s)
print "Running server node %s on port: %s and value of server = __temp__" % (node_id, port_s)
message = socket.recv()
print "Received request : %s from c_node %s and value (temp): __value__" % (message, c_node)
socket.send("Acknowledged - from %s" % port_s)
time.sleep(1)
socket.close()
context.term()
@staticmethod
def client(c_node):
context = zmq.Context()
# print "Server node __num__ with port %s" % port_s
socket = context.socket(zmq.REQ)
#for port in ports:
socket.connect ("tcp://localhost:%s" % port_c)
#for request in range(20):
print "c_node %s Sending request to server node __num__" % c_node
socket.send ("Hello")
message = socket.recv()
print "Received ack from server %s and message %s" % (node_id, message)
time.sleep (1)
socket.close()
context.term()
def node(self, node_id):
#global node_id
# global key
# ser_p = Process(target=self.server, args=(node_id,))
print 'Memory content of node %d\n' % node_id
for key in nodes_memory[node_id]:
print 'Neighbor={%s}, Temp={%s}\n' % (key, nodes_memory[node_id][key])
#return key
global c_node
#key1 = key
# cli_p = Process(target=self.client, args=(c_node,))
with open("Book5.csv","r+b") as input:
has_header = csv.Sniffer().has_header(input.read(1024))
input.seek(0) # rewind
incsv = csv.reader(input)
if has_header:
next(incsv) # skip header
csv_dict = csv.DictReader(input, skipinitialspace=True, delimiter=",")
node_id = 0
for row in csv_dict:
for i in row:
#print(row[i])
if type(row[i]) is str:
g.add_edge(node_id, int(i), conn_prob=(float(row[i])))
max_wg_ngs = sorted(g[node_id].items(), key=lambda e: e[1]["conn_prob"], reverse=True)[:2]
#maxim = max_wg_ngs.values.tolist()
#sarr = [str(a) for a in max_wg_ngs]
print "\nNeighbours of Node %d are:" % node_id
#print(max_wg_ngs)
ser_p = multiprocessing.Process(target=self.server, args=(node_id,))
ser_p.start()
for c_node, data in max_wg_ngs:
for key in nodes_memory[node_id]: #print ''.join(str(item))[1:-1]
#if type(key1) == node_id:
cli_p = multiprocessing.Process(target=self.client, args=(c_node,))
cli_p.start()
print('Node {a} with Connection Rate = {w}'.format(a=c_node, w=data['conn_prob']))
print('Temperature of Node {a} = {b}'.format(a=c_node, b=nodes_memory[node_id][key]))
node_id += 1
pos=nx.spring_layout(g, scale=100.)
nx.draw_networkx_nodes(g, pos)
nx.draw_networkx_edges(g,pos)
nx.draw_networkx_labels(g,pos)
#plt.axis('off')
#plt.show()
“消息”是“温度”(文件未显示在代码片段中,但目前不需要),作为参考,
Book5.csv
的值是 -0,1,2,3,4,5
0,0.257905291,0.775104118,0.239086843,0.002313744,0.416936603
0.346100279,0,0.438892758,0.598885794,0.002263231,0.406685237
0.753358102,0.222349243,0,0.407830809,0.001714776,0.507573592
0.185342928,0.571302688,0.51784403,0,0.003231018,0.295197533
0,0,0,0,0,0
0.478164621,0.418192795,0.646810223,0.410746629,0.002414973,0
ser_p
和cli_p
是在node
函数中调用的服务器和客户端函数的对象,即ser_p
在循环for row in csv_dict
中调用,cli_p
在for c_node, data in max_wg_ngs
中进一步调用。我在这里也使用 Networkx Python 库(仅使用 Book5.csv
中的连接概率值从客户端中找到 2 个最近的邻居)。请注意,
port_c
和port_s
在词汇上都是正确的(从未在其他地方定义过)。
如果多个进程尝试
.bind()
到通过 port_s
设置的同一端口#,它们只会(并且必须)发生冲突并陷入 ZMQError: Address already in use
异常。首先重新启动操作系统,接下来预扫描已使用的 IP:port#
-s,接下来设置非冲突服务器端 .bind()
(可能仍然存在挂起的 .context()
且未终止 .socket()
实例) (s)(通常来自手动原型设计或未处理的异常),保留 IP:port#
而不使其空闲,因此重新启动 + 端口扫描是一种进步。
使用任何确定性的、主要是非冲突的 server-2-
<transport-class://address:port>
映射(使用通配符的 .bind()
-s,锁定所有 IP 地址,是一个有点危险的习惯),您的代码将顺利运行。
始终使用
<socket>.setsockopt( zmq.LINGER, 0 )
以防止无限死锁。
始终使用
try: {...} except: {...} finally: {...}
正式表达式,以避免任何未处理的异常,从而孤立任何超出你控制范围的 .context()
实例,也许没有优雅的 .term() 启动和释放(即使新的 API 告诉你,这没有必要 - 明确处理这些情况并保持控制是专业的,所以没有例外,没有借口)。