我在使用 Flask 会话和 Spotify OAuth access_token 时遇到问题(我正在使用 Spotipy)。它没有正确存储用户的信息
我几个月前开始编程,并且正在使用 Python 开始我的第一个“大”项目,但是当用户登录我的网络应用程序时,我在处理信息时遇到了麻烦。即使当不同的用户登录时,它也始终显示我的 Spotify 用户名和播放列表信息。例如,如果一个名为“James”的用户登录,当他在
auth_url
中时,它会显示他的个人资料,但当它实际访问我的网页“getPlaylists”时,显示的所有信息都是我的。 Google/YouTube Oauth 部分运行完美,但 Spotify Oauth 则不然。
我在Spotify的开发者仪表板中添加了测试用户,但并没有解决问题。关于应该做什么有什么建议吗?我没有正确处理会话吗? 我不知道是否应该将所有代码粘贴到这里,但大部分代码如下:
import spotipy
from spotipy.oauth2 import SpotifyOAuth
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow, Flow
from google.auth.transport.requests import Request
import google.oauth2.credentials
import json
import os
import os.path
from flask import Flask, request, url_for, session, redirect, render_template
from flask_session import Session
import clientinfo # File containing all the info necessary to login the web APP to the Spotify and youtube APIs
app = Flask(__name__)
if __name__ == "__main__":
app.run(debug=True)
app.secret_key = os.environ['FLASK_S_KEY']
app.config['SESSION_COOKIE_NAME'] = 'Spotify Cookie'
# Configure session to use filesystem (instead of signed cookies)
app.config["SESSION_PERMANENT"] = False
app.config["SESSION_TYPE"] = "filesystem"
Session(app)
TOKEN_INFO = "token_info"
@app.after_request
def after_request(response):
"""Ensure responses aren't cached"""
response.headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
response.headers["Expires"] = 0
response.headers["Pragma"] = "no-cache"
return response
def create_spotify_oauth():
return SpotifyOAuth(
client_id = os.environ['SPOTIFY_CLIENT_ID'],
client_secret = os.environ['SPOTIFY_CLIENT_SECRET'],
redirect_uri = url_for('redirectSite', _external=True),
scope = "user-read-private playlist-read-private playlist-read-collaborative",
show_dialog=True
)
def get_token():
token_info = session.get(TOKEN_INFO, None)
if not token_info:
raise Exception("Token not found in session")
sp_oauth = create_spotify_oauth()
if sp_oauth.is_token_expired(token_info):
token_info = sp_oauth.refresh_access_token(token_info['refresh_token'])
session[TOKEN_INFO] = token_info
print(f"Token info: {token_info}") # Debug
return token_info
def credentialscheck():
# Disable OAuthlib's HTTPS verification when running locally.
# *DO NOT* leave this option enabled in production.
os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
json_credentials = session.get('credentials')
if 'credentials' in session:
dict_credentials = json.loads(json_credentials)
credentials = google.oauth2.credentials.Credentials.from_authorized_user_info(dict_credentials)
if credentials.expired:
credentials.refresh(Request())
else:
return redirect(url_for('redirectYT'))
# Endpoint Index
@app.route('/', methods=["GET"])
def index():
if request.method == "GET":
return render_template("index.html")
# Log in user to the web app using Spotify OAuth
@app.route("/login")
def login():
# Forget any user_id
session.clear()
sp_oauth = create_spotify_oauth()
auth_url = sp_oauth.get_authorize_url()
return redirect(auth_url)
@app.route("/logout")
def logout():
session.clear()
return redirect(url_for("index", _external=True))
# After login, redirect user to the page with the correct authorization token
@app.route("/redirect")
def redirectSite():
sp_oauth = create_spotify_oauth()
code = request.args.get("code")
token_info = sp_oauth.get_access_token(code)
session[TOKEN_INFO] = token_info
return redirect(url_for("redirectYT", _external=True))
@app.route("/redirectYT")
def redirectYT():
# Disable OAuthlib's HTTPS verification when running locally.
# *DO NOT* leave this option enabled in production.
os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
# Create OAuth flow object
flow = Flow.from_client_secrets_file(
'client_secrets.json',
scopes=["https://www.googleapis.com/auth/youtube.force-ssl"])
flow.redirect_uri = url_for('callback', _external=True)
authorization_url, state = flow.authorization_url(
access_type='offline',
prompt='select_account')
# Save the state so we can verify the request later
session['state'] = state
return redirect(authorization_url)
@app.route('/callback')
def callback():
# Verify the request state
if request.args.get('state') != session['state']:
raise Exception('Invalid state')
# Create the OAUth flow object
flow = InstalledAppFlow.from_client_secrets_file(
'client_secrets.json',
scopes=["https://www.googleapis.com/auth/youtube.force-ssl"],
state=session['state'])
flow.redirect_uri = url_for('callback', _external=True)
# Exchange the authorization code for an access token
authorization_response = request.url
flow.fetch_token(authorization_response=authorization_response)
# Save credentials to the session
credentials = flow.credentials
session['credentials'] = credentials.to_json()
return redirect(url_for('getPlaylists'))
@app.route('/getPlaylists')
def getPlaylists():
try:
token_info = get_token()
except Exception as e:
print(f"Exception: {e}")
return redirect(url_for("login", _external=True))
sp = spotipy.Spotify(auth=token_info['access_token'])
credentialscheck()
user = sp.current_user()
username = user['display_name']
print(f"User: {username}") # Debug
# Getting all playlists (since the current_user_playlists max limit is 50, we need a 'for' loop)
allPlaylists = []
i = 0
while True:
fiftyplaylists = sp.current_user_playlists(limit=50, offset=i * 50)['items']
i += 1
allPlaylists += fiftyplaylists
if (len(fiftyplaylists)< 50):
break
# Filtering the data we actually need in each playlist and sorting them alphabetically)
playlists = [{'name': playlist['name'], 'id': playlist['id']} for playlist in allPlaylists]
playlists.sort(key=lambda x: x['name'])
return render_template("getPlaylists.html", playlists=playlists, username=username)
我已经尝试打印
token_info
并且在getPlaylist
中它看起来总是一样的,用户名也总是相同的。恐怕我没有正确存储用户的 token_info
,但我看不到其他方法来修复它。我尝试了在 Stack Overflow 上看到的其他代码选项,但使用它时发生的情况是一样的。
import spotipy
from spotipy.oauth2 import SpotifyOAuth
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow, Flow
from google.auth.transport.requests import Request
import google.oauth2.credentials
import json
import os
import os.path
import time
from flask import Flask, request, url_for, session, redirect, render_template
from flask_session import Session
import clientinfo # File containing all the info necessary to login the web APP to the Spotify and youtube APIs
""" requirements [
Flask,
Spotipy,
OS,
time,
clientinfo
]"""
app = Flask(__name__)
if __name__ == "__main__":
app.run(debug=True)
app.secret_key = os.environ['FLASK_S_KEY']
app.config['SESSION_COOKIE_NAME'] = 'Spotify Cookie'
# Configure session to use filesystem (instead of signed cookies)
app.config["SESSION_PERMANENT"] = False
app.config["SESSION_TYPE"] = "filesystem"
Session(app)
@app.after_request
def after_request(response):
"""Ensure responses aren't cached"""
response.headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
response.headers["Expires"] = 0
response.headers["Pragma"] = "no-cache"
return response
def create_spotify_oauth():
return SpotifyOAuth(
client_id = os.environ['SPOTIFY_CLIENT_ID'],
client_secret = os.environ['SPOTIFY_CLIENT_SECRET'],
redirect_uri = url_for('redirectSite', _external=True),
scope = "user-read-private playlist-read-private playlist-read-collaborative",
show_dialog=True
)
def credentialscheck():
# Disable OAuthlib's HTTPS verification when running locally.
# *DO NOT* leave this option enabled in production.
os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
json_credentials = session.get('credentials')
if 'credentials' in session:
dict_credentials = json.loads(json_credentials)
credentials = google.oauth2.credentials.Credentials.from_authorized_user_info(dict_credentials)
if credentials.expired:
credentials.refresh(Request())
else:
return redirect(url_for('redirectYT'))
# Endpoint Index
@app.route('/', methods=["GET"])
def index():
if request.method == "GET":
return render_template("index.html")
# Log in user to the web app using Spotify OAuth
@app.route("/login")
def login():
# Forget any user_id
session.clear()
sp_oauth = create_spotify_oauth()
auth_url = sp_oauth.get_authorize_url()
return redirect(auth_url)
@app.route("/logout")
def logout():
session.clear()
return redirect(url_for("index", _external=True))
# After login, redirect user to the page with the correct authorization token
@app.route("/redirect")
def redirectSite():
sp_oauth = create_spotify_oauth()
session.clear()
code = request.args.get("code")
token_info = sp_oauth.get_access_token(code)
session["token_info"] = token_info
return redirect(url_for("redirectYT", _external=True))
@app.route("/redirectYT")
def redirectYT():
# Disable OAuthlib's HTTPS verification when running locally.
# *DO NOT* leave this option enabled in production.
os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
# Create OAuth flow object
flow = Flow.from_client_secrets_file(
'client_secrets.json',
scopes=["https://www.googleapis.com/auth/youtube.force-ssl"])
flow.redirect_uri = url_for('callback', _external=True)
authorization_url, state = flow.authorization_url(
access_type='offline',
prompt='select_account')
# Save the state so we can verify the request later
session['state'] = state
return redirect(authorization_url)
@app.route('/callback')
def callback():
# Verify the request state
if request.args.get('state') != session['state']:
raise Exception('Invalid state')
# Create the OAUth flow object
flow = InstalledAppFlow.from_client_secrets_file(
'client_secrets.json',
scopes=["https://www.googleapis.com/auth/youtube.force-ssl"],
state=session['state'])
flow.redirect_uri = url_for('callback', _external=True)
# Exchange the authorization code for an access token
authorization_response = request.url
flow.fetch_token(authorization_response=authorization_response)
# Save credentials to the session
credentials = flow.credentials
session['credentials'] = credentials.to_json()
return redirect(url_for('getPlaylists'))
@app.route('/getPlaylists')
def getPlaylists():
session['token_info'], authorized = get_token(session)
session.modified = True
if not authorized:
print("User not authorized!")
return redirect('/')
credentialscheck()
sp = spotipy.Spotify(auth = session.get('token_info').get('access_token'))
user = sp.current_user()
username = user['display_name']
print(f"User: {username}") # Debug
# Getting all playlists (since the current_user_playlists max limit is 50, we need a 'for' loop)
allPlaylists = []
i = 0
while True:
fiftyplaylists = sp.current_user_playlists(limit=50, offset=i * 50)['items']
i += 1
allPlaylists += fiftyplaylists
if (len(fiftyplaylists)< 50):
break
# Filtering the data we actually need in each playlist and sorting them alphabetically)
playlists = [{'name': playlist['name'], 'id': playlist['id']} for playlist in allPlaylists]
playlists.sort(key=lambda x: x['name'])
return render_template("getPlaylists.html", playlists=playlists, username=username)
def get_token(session):
token_valid = False
token_info = session.get("token_info", None)
if not (session.get('token_info', False)):
token_valid = False
return token_info, token_valid
now = int(time.time())
is_token_expired = session.get('token_info').get('expires_at') - now < 60
if (is_token_expired):
sp_oauth = create_spotify_oauth()
token_info = sp_oauth.refresh_access_token(session.get('token_info').get('refresh_token'))
token_valid = True
return token_info, token_valid
我用spotipy 库中的 Flask Session 的缓存处理程序解决了这个问题。我刚刚声明它
cache_handler = spotipy.cache_handler.FlaskSessionCacheHandler(session)
并添加到create_spotify_oauth()
。我将链接我在spotipy github 中看到的示例以更好地说明:spotipy 示例
无论如何,我的代码是:
import spotipy
from spotipy.oauth2 import SpotifyOAuth
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow, Flow
from google.auth.transport.requests import Request
import google.oauth2.credentials
import json
import os
import os.path
import time
from flask import Flask, request, url_for, session, redirect, render_template
from flask_session import Session
import clientinfo # File containing all the info necessary to login the web APP to the Spotify and youtube APIs
""" requirements [
Flask,
Spotipy,
Spotify OAuth,
Google/Youtube OAuth,
OS,
time,
clientinfo
]"""
app = Flask(__name__)
if __name__ == "__main__":
app.run(debug=True)
app.secret_key = os.environ['FLASK_S_KEY']
app.config['SESSION_COOKIE_NAME'] = 'Spotify Cookie'
# Configure session to use filesystem (instead of signed cookies)
app.config["SESSION_PERMANENT"] = False
app.config["SESSION_TYPE"] = "filesystem"
app.config['SESSION_FILE_DIR'] = './.flask_session/'
Session(app)
@app.after_request
def after_request(response):
"""Ensure responses aren't cached"""
response.headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
response.headers["Expires"] = 0
response.headers["Pragma"] = "no-cache"
return response
cache_handler = spotipy.cache_handler.FlaskSessionCacheHandler(session)
def create_spotify_oauth():
return SpotifyOAuth(
client_id = os.environ['SPOTIFY_CLIENT_ID'],
client_secret = os.environ['SPOTIFY_CLIENT_SECRET'],
redirect_uri = url_for('redirectSite', _external=True),
scope = "user-read-private playlist-read-private playlist-read-collaborative",
cache_handler=cache_handler,
show_dialog=True
)
def credentialscheck():
# Disable OAuthlib's HTTPS verification when running locally.
# *DO NOT* leave this option enabled in production.
os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
json_credentials = session.get('credentials')
if 'credentials' in session:
dict_credentials = json.loads(json_credentials)
credentials = google.oauth2.credentials.Credentials.from_authorized_user_info(dict_credentials)
if credentials.expired:
credentials.refresh(Request())
else:
return redirect(url_for('redirectYT'))
# Endpoint Index
@app.route('/', methods=["GET"])
def index():
if request.method == "GET":
return render_template("index.html")
# Log in user to the web app using Spotify OAuth
@app.route("/login")
def login():
# Forget any user_id
session.clear()
sp_oauth = create_spotify_oauth()
auth_url = sp_oauth.get_authorize_url()
return redirect(auth_url)
@app.route("/logout")
def logout():
session.clear()
return redirect(url_for("index", _external=True))
# After login, redirect user to the page with the correct authorization token
@app.route("/redirect")
def redirectSite():
sp_oauth = create_spotify_oauth()
session.clear()
code = request.args.get("code")
token_info = sp_oauth.get_access_token(code)
session["token_info"] = token_info
return redirect(url_for("redirectYT", _external=True))
@app.route("/redirectYT")
def redirectYT():
# Disable OAuthlib's HTTPS verification when running locally.
# *DO NOT* leave this option enabled in production.
os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
# Create OAuth flow object
flow = Flow.from_client_secrets_file(
'client_secrets.json',
scopes=["https://www.googleapis.com/auth/youtube.force-ssl"])
flow.redirect_uri = url_for('callback', _external=True)
authorization_url, state = flow.authorization_url(
access_type='offline',
prompt='select_account')
# Save the state so we can verify the request later
session['state'] = state
return redirect(authorization_url)
@app.route('/callback')
def callback():
# Verify the request state
if request.args.get('state') != session['state']:
raise Exception('Invalid state')
# Create the OAUth flow object
flow = InstalledAppFlow.from_client_secrets_file(
'client_secrets.json',
scopes=["https://www.googleapis.com/auth/youtube.force-ssl"],
state=session['state'])
flow.redirect_uri = url_for('callback', _external=True)
# Exchange the authorization code for an access token
authorization_response = request.url
flow.fetch_token(authorization_response=authorization_response)
# Save credentials to the session
credentials = flow.credentials
session['credentials'] = credentials.to_json()
return redirect(url_for('getPlaylists'))
@app.route('/getPlaylists')
def getPlaylists():
session['token_info'], authorized = get_token(session)
session.modified = True
if not authorized:
print("User not authorized!")
return redirect('/')
credentialscheck()
sp = spotipy.Spotify(auth = session.get('token_info').get('access_token'))
user = sp.current_user()
username = user['display_name']
# Getting all playlists (since the current_user_playlists max limit is 50, we need a 'for' loop)
allPlaylists = []
i = 0
while True:
fiftyplaylists = sp.current_user_playlists(limit=50, offset=i * 50)['items']
i += 1
allPlaylists += fiftyplaylists
if (len(fiftyplaylists)< 50):
break
# Filtering the data we actually need in each playlist and sorting them alphabetically)
playlists = [{'name': playlist['name'], 'id': playlist['id']} for playlist in allPlaylists]
playlists.sort(key=lambda x: x['name'])
return render_template("getPlaylists.html", playlists=playlists, username=username)
def get_token(session):
token_valid = False
token_info = session.get("token_info", None)
if not (session.get('token_info', False)):
token_valid = False
return token_info, token_valid
now = int(time.time())
is_token_expired = session.get('token_info').get('expires_at') - now < 60
if (is_token_expired):
sp_oauth = create_spotify_oauth()
token_info = sp_oauth.refresh_access_token(session.get('token_info').get('refresh_token'))
token_valid = True
return token_info, token_valid