我有一个Websocket服务器应用程序,该应用程序使用Tornado PeriodicCallback
向每个Websocket客户端发送消息。
ioloop.PeriodicCallback(dispatch, 10).start()
ioloop.IOLoop.instance().start()
dispatch()
函数具有一个循环,以使用RabbitMQ消息,然后将其转发到每个websocket客户端。
def dispatch():
global channel, queue
time_start = time.time()
while True:
try:
method_frame, header_frame, body = channel.basic_get(queue)
if method_frame:
message = json.loads(body.decode('utf-8'))
if 'websocket_uri' in message:
websocket_uri = message['websocket_uri']
uri = urlparse(websocket_uri)
path = uri.path
else:
path = ''
if 'payload' in message:
payload = json.dumps(message['payload'])
else:
payload = ''
for client in clients:
if client.path == path:
client.write_message(payload)
logger.info('WRITE: %s: %s' % (client.path, payload))
channel.basic_ack(method_frame.delivery_tag)
except Exception as e:
logger.exception(str(e))
channel.basic_nack(method_frame.delivery_tag)
finally:
time_end = time.time()
if time_end - time_start > 1:
break;
return
以某种方式,当我使用较大的callback_time值(例如100ms或200ms)时,并非所有消息都转发到websocket客户端。但是,当我使用较小的值(如10ms或1ms)时,该功能有效。
PeriodicCallback
实际如何工作?如何确保dispatch()
函数总是被龙卷风调用?
谢谢
我找到了解决方案。我将PeriodicCallback
替换为add_callback
:
app.listen(9001)
mainloop = ioloop.IOLoop.current()
mainloop.add_callback(dispatch)
mainloop.start()
然后在add_callback
函数的末尾使用dispatch()
,因此将在下一次I / O迭代中调用dispatch()
函数。
def dispatch():
global channel, queue
while True:
try:
method_frame, header_frame, body = channel.basic_get(queue)
if method_frame:
message = json.loads(body.decode('utf-8'))
if 'websocket_uri' in message:
websocket_uri = message['websocket_uri']
uri = urlparse(websocket_uri)
path = uri.path
else:
path = ''
if 'payload' in message:
payload = json.dumps(message['payload'])
logger.info('Payload: %s' % payload)
else:
payload = ''
for client in clients:
logger.info('Path: %s' % client.path)
if client.path == path:
client.write_message(payload)
logger.info('WRITE: %s: %s' % (client.path, payload))
channel.basic_ack(method_frame.delivery_tag)
else:
break;
except Exception as e:
logger.exception(str(e))
channel.basic_nack(method_frame.delivery_tag)
ioloop.IOLoop.current().add_callback(dispatch)
return