Tornado PeriodicCallback在使用较大的callback_time(即:200ms)时不起作用

问题描述 投票:0回答:1

我有一个Websocket服务器应用程序,该应用程序使用Tornado PeriodicCallback向每个Websocket客户端发送消息。

ioloop.PeriodicCallback(dispatch, 10).start()
ioloop.IOLoop.instance().start()

dispatch()函数具有一个循环,以使用RabbitMQ消息,然后将其转发到每个websocket客户端。

def dispatch():

    global channel, queue
    time_start = time.time()
    while True:
        try:
            method_frame, header_frame, body = channel.basic_get(queue)
            if method_frame:
                message = json.loads(body.decode('utf-8'))
                if 'websocket_uri' in message:
                    websocket_uri = message['websocket_uri']
                    uri = urlparse(websocket_uri)
                    path = uri.path
                else:
                    path = ''
                if 'payload' in message:
                    payload = json.dumps(message['payload'])
                else:
                    payload = ''
                for client in clients:
                    if client.path == path:
                        client.write_message(payload)
                        logger.info('WRITE: %s: %s' % (client.path, payload))
                channel.basic_ack(method_frame.delivery_tag)
        except Exception as e:
            logger.exception(str(e))
            channel.basic_nack(method_frame.delivery_tag)
        finally:
            time_end = time.time()

        if time_end - time_start > 1:
            break;

    return

以某种方式,当我使用较大的callback_time值(例如100ms或200ms)时,并非所有消息都转发到websocket客户端。但是,当我使用较小的值(如10ms或1ms)时,该功能有效。

PeriodicCallback实际如何工作?如何确保dispatch()函数总是被龙卷风调用?

谢谢

python python-3.x websocket rabbitmq tornado
1个回答
0
投票

我找到了解决方案。我将PeriodicCallback替换为add_callback

app.listen(9001)
mainloop = ioloop.IOLoop.current()
mainloop.add_callback(dispatch)
mainloop.start()

然后在add_callback函数的末尾使用dispatch(),因此将在下一次I / O迭代中调用dispatch()函数。

def dispatch():

    global channel, queue
    while True:
        try:
            method_frame, header_frame, body = channel.basic_get(queue)
            if method_frame:
                message = json.loads(body.decode('utf-8'))
                if 'websocket_uri' in message:
                    websocket_uri = message['websocket_uri']
                    uri = urlparse(websocket_uri)
                    path = uri.path
                else:
                    path = ''
                if 'payload' in message:
                    payload = json.dumps(message['payload'])
                    logger.info('Payload: %s' % payload)
                else:
                    payload = ''
                for client in clients:
                    logger.info('Path: %s' % client.path)
                    if client.path == path:
                        client.write_message(payload)
                        logger.info('WRITE: %s: %s' % (client.path, payload))
                channel.basic_ack(method_frame.delivery_tag)
            else:
                break;
        except Exception as e:
            logger.exception(str(e))
            channel.basic_nack(method_frame.delivery_tag)

    ioloop.IOLoop.current().add_callback(dispatch)
    return
© www.soinside.com 2019 - 2024. All rights reserved.