Tornado API代理并将响应记录到Kafka中。

问题描述 投票:0回答:1

我正在用Tronado搭建一个轻量级的代理服务器,将请求+响应记录到Kafka队列中。我的客户端请求->Tornado代理(我正在开发)->TensorFlow服务API返回客户端响应时间不能超过20毫秒。

Tornado版本6.0.4Kafka-python。

只用Tornado代理服务器,不记录到Kafka,响应时间在14ms以下,用Kafka记录,响应时间会到20000ms,这是不能接受的。如何降低使用Kafka发布的响应时间?我是Tornado框架的新手。

感谢任何帮助。

我确实看到了这个帖子 此处 而Kiel python库似乎在4年前就有更新了,还能用吗?或者我们现在有更好的方法来实现吗?

主服务器代码。

from tornado.web import Application, RequestHandler
from tornado.ioloop import IOLoop
from model_proxy_server.tfs_request import request
from tornado import gen
from model_proxy_server.model_request_response_publisher import publish_message
import json

class PostToTfs(RequestHandler):

@gen.coroutine
def post (self, *args, **kwargs):
    kafka_msg = {}

    model_url = 'http://localhost:8501' +  self.request.path

    response = yield request(self.request.body, model_url)

    resp = json.loads(response)
    yield self.write(resp)

    kafka_msg['request_url'] = model_url
    kafka_msg['request'] = json.loads(self.request.body)
    kafka_msg['response'] = resp
    msg = json.dumps(kafka_msg)

    # Publish the request and response to Kafka topic
    yield publish_message(msg, 'dev-ml-model-logs')


def make_app():
    # TODO Get to know on how to host this in PROD with common URL for this APP
    urls = [(r"/.*", PostToTfs)]
    return Application(urls, debug=True)

def main():
    app = make_app()
    app.listen(3000)
    IOLoop.instance().start()

if __name__ == '__main__':
    main()

请求处理程序:

import tornado.httpclient
from tornado.ioloop import IOLoop
from tornado import gen
import tornado.options
import logging

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

@gen.coroutine
def json_fetch(http_client, body, model_url):
    try:
        response = yield http_client.fetch(f"{model_url}", method='POST', body=body)
    except Exception as e:

        logging.info("Exception on http_client fetch from url")
        logging.error("Error: " + str(e))

    raise gen.Return(response)

@gen.coroutine
def request(body, model_url):

    http_client = tornado.httpclient.AsyncHTTPClient()
    http_response = yield json_fetch(http_client, body, model_url)
    return http_response.body

if __name__ == "__main__":
    tornado.options.parse_command_line()
    IOLoop.instance().run_sync(request)

Kafka发布者。

from kafka import KafkaProducer
from tornado import gen
import logging

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

@gen.coroutine
def publish_message(msg, topic_name):
    producer_instance = _kafka_connect()
    msg_bytes = bytes(msg, encoding='utf-8')

    try:
        yield producer_instance.send(f'{topic_name}', msg_bytes)
        logging.info("Message published successfully")
    except Exception as e:
        logging.info("Exception in publishing message")
        logging.error(str(e))

@gen.coroutine
def _kafka_connect():

    try:
         producer = yield KafkaProducer(bootstrap_servers='ec2prdkafka01:9093', api_version=(0, 10))

    except Exception as e:
        logging.info("Exception while connecting to Kafka")
        logging.error(str(e))

    return producer
python asynchronous apache-kafka responsive tornado
1个回答
0
投票

好了,这就是我如何解决这个问题,并将我的代理服务器的响应时间从20000ms降到20ms。

Kafka Publishers创建与Kafka的连接是在每次调用该模块时进行的,这是响应时间的瓶颈。

所以我在启动代理服务器的同时,建立了与Kafka的连接,如下图所示。

def make_app():
    producer = KafkaProducer(bootstrap_servers='ec2prdkafka01:9093',api_version=(0, 10))
    urls = [(r"/.*", PostToTfs), dict(producer=producer)]
    return Application(urls, debug=True)

一旦它被传到这里,我们需要初始化生产者,这样它就可以直接使用。

def post (self, *args, **kwargs):

def initilize(self, producer):
    self.producer = producer 

希望对寻找这类信息的人有所帮助。

© www.soinside.com 2019 - 2024. All rights reserved.