我目前正在开发一个可进行多个 API 调用的 Flask Web 应用程序。我希望通过向前端发送动态状态更新来指示应用程序当前所在的 API 调用,从而提高用户的透明度。我尝试使用全局变量和专用状态路由来实现此目的,但它没有按预期工作。
我的问题:
状态路由似乎没有按预期更新。谁能帮我解决这个问题或建议一个更好的方法将状态动态发送到前端?
问题陈述:
我有一个全局变量 current_status 来存储当前状态。 我在 API 调用函数中更新此变量以反映相应的状态。
from flask import Flask, jsonify
app = Flask(__name__)
# Global variable to store the current status
current_status = "Initial Status"
# Route to get the current status
@app.route('/status', methods=['GET'])
def get_status():
return jsonify({'status': current_status})
# Example method to update the status
@app.route('/update_status/<string:new_status>', methods=['PUT'])
def update_status(new_status):
global current_status
current_status = new_status
return jsonify({'message': 'Status has been updated', 'new_status': current_status})
if __name__ == '__main__':
app.run(debug=True)
我将如何使用此设置来更新路线状态?
如果您想为 Flask 应用程序存储全局状态,则需要将该数据存储在应用程序外部的某个位置。正如 @moon548834 建议的,一种选择是将其存储在本地文件中。如果您的应用程序在单个服务器上运行,那么这可以正常工作;您需要实现适当的锁定,以便从多个线程/进程访问文件而不会引起问题。
使用内置
fcntl
模块的可能解决方案可能如下所示:
import fcntl
from flask import Flask, jsonify
app = Flask(__name__)
class StateFile:
def __init__(self, path, lockfile=None):
self.path = path
self.lockfile = lockfile if lockfile else f'{self.path}.lock'
def __enter__(self):
self._fd = open(self.lockfile, 'w')
fcntl.lockf(self._fd, fcntl.LOCK_EX)
return self
def __exit__(self, *_):
fcntl.lockf(self._fd, fcntl.LOCK_UN)
self._fd.close()
def get(self):
with open(self.path, 'r') as fd:
return fd.read()
def put(self, data):
with open(self.path, 'w') as fd:
fd.write(data)
fd.truncate()
# Global variable to store the current status
current_status = StateFile('status.txt')
with current_status:
current_status.put('initial status')
# Route to get the current status
@app.route('/status', methods=['GET'])
def get_status():
with current_status:
return jsonify({'status': current_status.get()})
# Example method to update the status
@app.route('/update_status/<string:new_status>', methods=['PUT'])
def update_status(new_status):
with current_status:
current_status.put(new_status)
return jsonify({'message': 'Status has been updated', 'new_status': current_status.get()})
if __name__ == '__main__':
app.run(debug=True)
根据您的需求(例如,如果您的应用程序需要跨多个服务器运行),您可以使用 Postgresql 等数据库或 Redis 等内存数据存储来存储状态信息。