我在通过SocketIO从RabbitMQ向用户发送消息时遇到问题。
我有使用SocketIO integration的Flask应用程序。当前的用户流程似乎如此
问题是我无法设置RabbitMQ监听器,它通过SocketIO将消息转发给浏览器。每次我得到不同的错误。主要是连接关闭,或者我在应用程序上下文之外工作。
我尝试了很多方法,这是我的最后一个方法。
# callback
def mq_listen(uid):
rabbit = RabbitMQ()
def cb(ch, method, properties, body, mq=rabbit):
to_return = [0] # mutable
message = Message.load(body)
to_return[0] = message.get_message()
emit('report_part', {"data": to_return[0]})
rabbit.listen('results', callback=cb, id=uid)
# this is the page, which user reach
@blueprint.route('/report_result/<uid>', methods=['GET'])
def report_result(uid):
thread = threading.Thread(target=mq_listen, args=(uid,))
thread.start()
return render_template("property/report_result.html", socket_id=uid)
其中rabbit.listen方法是抽象的:
def listen(self, queue_name, callback=None, id=None):
if callback is not None:
callback_function = callback
else:
callback_function = self.__callback
if id is None:
self.channel.queue_declare(queue=queue_name, durable=True)
self.channel.basic_qos(prefetch_count=1)
self.consumer_tag = self.channel.basic_consume(callback_function, queue=queue_name)
self.channel.start_consuming()
else:
self.channel.exchange_declare(exchange=queue_name, type='direct')
result = self.channel.queue_declare(exclusive=True)
exchange_name = result.method.queue
self.channel.queue_bind(exchange=queue_name, queue=exchange_name, routing_key=id)
self.channel.basic_consume(callback_function, queue=exchange_name, no_ack=True)
self.channel.start_consuming()
这导致了
RuntimeError: working outside of request context
我会很高兴任何提示或使用示例。
非常感谢
我有一个类似的问题,在一天结束时,因为当你提出请求时,烧瓶将请求上下文传递给客户端。但解决方案不是添加app.app_context()。这是hackey,肯定会有错误,因为你不是本地发送请求上下文。
我的解决方案是创建一个重定向,以便维护请求上下文,如:
def sendToRedisFeed(eventPerson, type):
eventPerson['type'] = type
requests.get('http://localhost:5012/zmq-redirect', json=eventPerson)
这是我的重定向函数,所以每当有一个事件我想推送到我的PubSub时,它会通过这个函数,然后推送到localhost端点。
from flask_sse import sse
app.register_blueprint(sse, url_prefix='/stream')
@app.route('/zmq-redirect', methods=['GET'])
def send_message():
try:
sse.publish(request.get_json(), type='greeting')
return Response('Sent!', mimetype="text/event-stream")
except Exception as e:
print (e)
pass
现在,只要将事件推送到我的/ zmq-redirect端点,它就会被重定向并通过SSE发布。
现在最后,只是把所有东西都包起来,客户:
var source = new EventSource("/stream");
source.addEventListener(
"greeting",
function(event) {
console.log(event)
}
)
错误消息表明它是Flask问题。处理请求时,Flask会设置上下文,但由于您使用的是线程,因此上下文会丢失。当它需要时,它不再可用,因此Flask给出了“在请求上下文之外工作”错误。
解决此问题的常用方法是手动提供上下文。在文档中有一节关于这个:http://flask.pocoo.org/docs/1.0/appcontext/#manually-push-a-context
您的代码未显示socketio部分。但是我想知道使用像flask-socketio这样的东西可以简化一些东西......(https://flask-socketio.readthedocs.io/en/latest/)。我将在后台打开RabbitMQ连接(最好是一次)并使用emit
函数向连接的SocketIO客户端发送任何更新。