我正在尝试使用以下代码对zmq进行尝试,但是订阅者将对象一个接一个地获取。
以下是我的PUSH脚本:
# zmq server -- run it once
import zmq
import time
# server
# print(zmq.Context)
ctx = zmq.Context()
sock = ctx.socket(zmq.PUSH)
sock.bind('ipc:///tmp/zmqtest')
i=0
while True:
i+=1
time.sleep(0.5)
sock.send_pyobj((i))
以下是PULL脚本:
# zmq client -- run it 2,3 times in parallel
import zmq
ctx = zmq.Context() # create a new context to kick the wheels
sock = ctx.socket(zmq.PULL)
sock.connect('ipc:///tmp/zmqtest')
i=0
while True:
i+=1
o = sock.recv_pyobj()
print('received python object:', o,i)
if o == 'quit':
print('exiting.')
break
我从PULL脚本之一获得以下输出:
received python object: 1 1
received python object: 3 2
received python object: 5 3
received python object: 7 4
如何将对象同时推送到两个脚本?我尝试了PUB / SUB,但这种方式无法正常工作。 (可以检查将PUSH/PULL
替换为PUB/SUB
)
PUB-
side:# zmq PUB-server -- run it once
import zmq
import time
IPC = 'ipc:///tmp/zmqtest'
ctx = zmq.Context()
PUB = ctx.socket( zmq.PUB )
PUB.bind( IPC )
#------------------------------------------------- SELF-DEFENSIVE CONFIGURATION
PUB.setsockopt( zmq.LINGER, 0 )
PUB.setsockopt( zmq... )
#------------------------------------------------------------------------------
i = 0
while True:
i += 1
time.sleep( 0.5 )
sock.send_pyobj( ( i ) )
#------------------------------------------------------------------------------
SUB
-侧(s):# zmq SUB-client -- run x-times concurrently ( or distributed, if other TransportClasses permit )
import zmq
IPC = 'ipc:///tmp/zmqtest' # <TransportClass>://<address>, TCP,TIPC,...may follow
ctx = zmq.Context() # create a new context to kick the wheels
SUB = ctx.socket( zmq.SUB )
SUB.connect( IPC )
#------------------------------------------------- SELF-DEFENSIVE CONFIGURATION
SUB.setsockopt( zmq.LINGER, 0 )
SUB.setsockopt( zmq.SUBSCRIBE, "" )
SUB.setsockopt( zmq... )
#------------------------------------------------------------------------------
i = 0
aClk = zmq.Stopwatch()
MASK = '(i:{1:_>9d}): After{2:_>+12d} [us] did .recv() a python object:[{0:}]'
while True:
i += 1
aClk.start()
o = sock.recv_pyobj()
_ = aClk.stop()
print( MASK.format( repr( o ), i, _ ) )
if o == 'quit':
print( 'Will exit.' )
#--------------------------------------- BE NICE & FAIR TO RESOURCES
SUB.setsockopt( zmq.UNSUBSCRIBE, "" )
SUB.disconnect( IPC )
SUB.close()
ctx.term()
#-------------------------------------------------------------------
break
“是的。我需要将每个对象都发送到两个(或多个)脚本无损失”
警告,对此实行零保修。一个人可以构建自己的应用程序级协议来实现这一目标。