我正在用Tronado搭建一个轻量级的代理服务器,将请求+响应记录到Kafka队列中。我的客户端请求->Tornado代理(我正在开发)->TensorFlow服务API返回客户端响应时间不能超过20毫秒。
Tornado版本6.0.4Kafka-python。
只用Tornado代理服务器,不记录到Kafka,响应时间在14ms以下,用Kafka记录,响应时间会到20000ms,这是不能接受的。如何降低使用Kafka发布的响应时间?我是Tornado框架的新手。
感谢任何帮助。
我确实看到了这个帖子 此处 而Kiel python库似乎在4年前就有更新了,还能用吗?或者我们现在有更好的方法来实现吗?
主服务器代码。
from tornado.web import Application, RequestHandler
from tornado.ioloop import IOLoop
from model_proxy_server.tfs_request import request
from tornado import gen
from model_proxy_server.model_request_response_publisher import publish_message
import json
class PostToTfs(RequestHandler):
@gen.coroutine
def post (self, *args, **kwargs):
kafka_msg = {}
model_url = 'http://localhost:8501' + self.request.path
response = yield request(self.request.body, model_url)
resp = json.loads(response)
yield self.write(resp)
kafka_msg['request_url'] = model_url
kafka_msg['request'] = json.loads(self.request.body)
kafka_msg['response'] = resp
msg = json.dumps(kafka_msg)
# Publish the request and response to Kafka topic
yield publish_message(msg, 'dev-ml-model-logs')
def make_app():
# TODO Get to know on how to host this in PROD with common URL for this APP
urls = [(r"/.*", PostToTfs)]
return Application(urls, debug=True)
def main():
app = make_app()
app.listen(3000)
IOLoop.instance().start()
if __name__ == '__main__':
main()
请求处理程序:
import tornado.httpclient
from tornado.ioloop import IOLoop
from tornado import gen
import tornado.options
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
@gen.coroutine
def json_fetch(http_client, body, model_url):
try:
response = yield http_client.fetch(f"{model_url}", method='POST', body=body)
except Exception as e:
logging.info("Exception on http_client fetch from url")
logging.error("Error: " + str(e))
raise gen.Return(response)
@gen.coroutine
def request(body, model_url):
http_client = tornado.httpclient.AsyncHTTPClient()
http_response = yield json_fetch(http_client, body, model_url)
return http_response.body
if __name__ == "__main__":
tornado.options.parse_command_line()
IOLoop.instance().run_sync(request)
Kafka发布者。
from kafka import KafkaProducer
from tornado import gen
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
@gen.coroutine
def publish_message(msg, topic_name):
producer_instance = _kafka_connect()
msg_bytes = bytes(msg, encoding='utf-8')
try:
yield producer_instance.send(f'{topic_name}', msg_bytes)
logging.info("Message published successfully")
except Exception as e:
logging.info("Exception in publishing message")
logging.error(str(e))
@gen.coroutine
def _kafka_connect():
try:
producer = yield KafkaProducer(bootstrap_servers='ec2prdkafka01:9093', api_version=(0, 10))
except Exception as e:
logging.info("Exception while connecting to Kafka")
logging.error(str(e))
return producer
好了,这就是我如何解决这个问题,并将我的代理服务器的响应时间从20000ms降到20ms。
Kafka Publishers创建与Kafka的连接是在每次调用该模块时进行的,这是响应时间的瓶颈。
所以我在启动代理服务器的同时,建立了与Kafka的连接,如下图所示。
def make_app():
producer = KafkaProducer(bootstrap_servers='ec2prdkafka01:9093',api_version=(0, 10))
urls = [(r"/.*", PostToTfs), dict(producer=producer)]
return Application(urls, debug=True)
一旦它被传到这里,我们需要初始化生产者,这样它就可以直接使用。
def post (self, *args, **kwargs):
def initilize(self, producer):
self.producer = producer
希望对寻找这类信息的人有所帮助。