Spotipy 未正确存储用户信息

问题描述 投票:0回答:1

我在使用 Flask 会话和 Spotify OAuth access_token 时遇到问题(我正在使用 Spotipy)。它没有正确存储用户的信息

我几个月前开始编程,并且正在使用 Python 开始我的第一个“大”项目,但是当用户登录我的网络应用程序时,我在处理信息时遇到了麻烦。即使当不同的用户登录时,它也始终显示我的 Spotify 用户名和播放列表信息。例如,如果一个名为“James”的用户登录,当他在

auth_url
中时,它会显示他的个人资料,但当它实际访问我的网页“getPlaylists”时,显示的所有信息都是我的。 Google/YouTube Oauth 部分运行完美,但 Spotify Oauth 则不然。

我在Spotify的开发者仪表板中添加了测试用户,但并没有解决问题。关于应该做什么有什么建议吗?我没有正确处理会话吗? 我不知道是否应该将所有代码粘贴到这里,但大部分代码如下:

import spotipy
from spotipy.oauth2 import SpotifyOAuth

from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow, Flow
from google.auth.transport.requests import Request
import google.oauth2.credentials

import json
import os
import os.path

from flask import Flask, request, url_for, session, redirect, render_template
from flask_session import Session

import clientinfo # File containing all the info necessary to login the web APP to the Spotify and youtube APIs


app = Flask(__name__)
if __name__ == "__main__":
    app.run(debug=True)

app.secret_key = os.environ['FLASK_S_KEY']

app.config['SESSION_COOKIE_NAME'] = 'Spotify Cookie'

# Configure session to use filesystem (instead of signed cookies)
app.config["SESSION_PERMANENT"] = False
app.config["SESSION_TYPE"] = "filesystem"
Session(app)

TOKEN_INFO = "token_info"

@app.after_request
def after_request(response):
    """Ensure responses aren't cached"""
    response.headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
    response.headers["Expires"] = 0
    response.headers["Pragma"] = "no-cache"
    return response


def create_spotify_oauth():
    return SpotifyOAuth(
        client_id = os.environ['SPOTIFY_CLIENT_ID'],
        client_secret = os.environ['SPOTIFY_CLIENT_SECRET'],
        redirect_uri = url_for('redirectSite', _external=True),
        scope = "user-read-private playlist-read-private playlist-read-collaborative",
        show_dialog=True
    )

def get_token():
    token_info = session.get(TOKEN_INFO, None)
    if not token_info:
        raise Exception("Token not found in session")
    
    sp_oauth = create_spotify_oauth()

    if sp_oauth.is_token_expired(token_info):
        token_info = sp_oauth.refresh_access_token(token_info['refresh_token'])
        session[TOKEN_INFO] = token_info
    
    print(f"Token info: {token_info}") # Debug
    return token_info

def credentialscheck():
    # Disable OAuthlib's HTTPS verification when running locally.
    # *DO NOT* leave this option enabled in production.
    os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
    
    json_credentials = session.get('credentials')
    
    if 'credentials' in session:
        dict_credentials = json.loads(json_credentials)
        credentials = google.oauth2.credentials.Credentials.from_authorized_user_info(dict_credentials)
        

        if credentials.expired:
            credentials.refresh(Request())

    else:
        return redirect(url_for('redirectYT'))

# Endpoint Index
@app.route('/', methods=["GET"])
def index():
    if request.method == "GET":
        return render_template("index.html")
    

# Log in user to the web app using Spotify OAuth
@app.route("/login")
def login():
     # Forget any user_id
    session.clear()

    sp_oauth = create_spotify_oauth()
    auth_url = sp_oauth.get_authorize_url()
    return redirect(auth_url)

@app.route("/logout")
def logout():
    session.clear()
    return redirect(url_for("index", _external=True))

# After login, redirect user to the page with the correct authorization token
@app.route("/redirect")
def redirectSite():
    sp_oauth = create_spotify_oauth()
    code = request.args.get("code")
    token_info = sp_oauth.get_access_token(code)
    session[TOKEN_INFO] = token_info

    return redirect(url_for("redirectYT", _external=True))

@app.route("/redirectYT")
def redirectYT():
    # Disable OAuthlib's HTTPS verification when running locally.
    # *DO NOT* leave this option enabled in production.
    os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"

   # Create OAuth flow object
    flow = Flow.from_client_secrets_file(
        'client_secrets.json',
        scopes=["https://www.googleapis.com/auth/youtube.force-ssl"])
    flow.redirect_uri = url_for('callback', _external=True)
    authorization_url, state = flow.authorization_url(
        access_type='offline',
        prompt='select_account')
    
    # Save the state so we can verify the request later
    session['state'] = state
    
    return redirect(authorization_url)

@app.route('/callback')
def callback():
    # Verify the request state
    if request.args.get('state') != session['state']:
        raise Exception('Invalid state')
    
    # Create the OAUth flow object
    flow = InstalledAppFlow.from_client_secrets_file(
        'client_secrets.json',
        scopes=["https://www.googleapis.com/auth/youtube.force-ssl"],
        state=session['state'])
    flow.redirect_uri = url_for('callback', _external=True)

    # Exchange the authorization code for an access token
    authorization_response = request.url
    flow.fetch_token(authorization_response=authorization_response)

    # Save credentials to the session
    credentials = flow.credentials
    session['credentials'] = credentials.to_json()

    return redirect(url_for('getPlaylists'))

@app.route('/getPlaylists')
def getPlaylists():
    try: 
       token_info = get_token()
       
    except Exception as e:
       print(f"Exception: {e}")
       return redirect(url_for("login", _external=True))
    
    sp = spotipy.Spotify(auth=token_info['access_token'])
    credentialscheck()
    user = sp.current_user()
    username = user['display_name']
    print(f"User: {username}") # Debug
    
    # Getting all playlists (since the current_user_playlists max limit is 50, we need a 'for' loop)
    allPlaylists = []
    i = 0
    while True:
        fiftyplaylists = sp.current_user_playlists(limit=50, offset=i * 50)['items']
        i += 1
        allPlaylists += fiftyplaylists
        if (len(fiftyplaylists)< 50):
            break

    # Filtering the data we actually need in each playlist and sorting them alphabetically)
    playlists = [{'name': playlist['name'], 'id': playlist['id']} for playlist in allPlaylists]
    playlists.sort(key=lambda x: x['name'])

    return render_template("getPlaylists.html", playlists=playlists, username=username)

我已经尝试打印

token_info
并且在
getPlaylist
中它看起来总是一样的,用户名也总是相同的。恐怕我没有正确存储用户的
token_info
,但我看不到其他方法来修复它。我尝试了在 Stack Overflow 上看到的其他代码选项,但使用它时发生的情况是一样的。

import spotipy
from spotipy.oauth2 import SpotifyOAuth

from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow, Flow
from google.auth.transport.requests import Request
import google.oauth2.credentials

import json
import os
import os.path
import time

from flask import Flask, request, url_for, session, redirect, render_template
from flask_session import Session

import clientinfo # File containing all the info necessary to login the web APP to the Spotify and youtube APIs

""" requirements [
    Flask,
    Spotipy,
    OS,
    time,
    clientinfo
]"""

app = Flask(__name__)
if __name__ == "__main__":
    app.run(debug=True)

app.secret_key = os.environ['FLASK_S_KEY']

app.config['SESSION_COOKIE_NAME'] = 'Spotify Cookie'

# Configure session to use filesystem (instead of signed cookies)
app.config["SESSION_PERMANENT"] = False
app.config["SESSION_TYPE"] = "filesystem"
Session(app)


@app.after_request
def after_request(response):
    """Ensure responses aren't cached"""
    response.headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
    response.headers["Expires"] = 0
    response.headers["Pragma"] = "no-cache"
    return response


def create_spotify_oauth():
    return SpotifyOAuth(
        client_id = os.environ['SPOTIFY_CLIENT_ID'],
        client_secret = os.environ['SPOTIFY_CLIENT_SECRET'],
        redirect_uri = url_for('redirectSite', _external=True),
        scope = "user-read-private playlist-read-private playlist-read-collaborative",
        show_dialog=True
    )

def credentialscheck():
    # Disable OAuthlib's HTTPS verification when running locally.
    # *DO NOT* leave this option enabled in production.
    os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
    
    json_credentials = session.get('credentials')
    
    if 'credentials' in session:
        dict_credentials = json.loads(json_credentials)
        credentials = google.oauth2.credentials.Credentials.from_authorized_user_info(dict_credentials)
        

        if credentials.expired:
            credentials.refresh(Request())

    else:
        return redirect(url_for('redirectYT'))

# Endpoint Index
@app.route('/', methods=["GET"])
def index():
    if request.method == "GET":
        return render_template("index.html")
    

# Log in user to the web app using Spotify OAuth
@app.route("/login")
def login():
     # Forget any user_id
    session.clear()

    sp_oauth = create_spotify_oauth()
    auth_url = sp_oauth.get_authorize_url()
    return redirect(auth_url)

@app.route("/logout")
def logout():
    session.clear()
    return redirect(url_for("index", _external=True))

# After login, redirect user to the page with the correct authorization token
@app.route("/redirect")
def redirectSite():
    sp_oauth = create_spotify_oauth()
    session.clear()
    code = request.args.get("code")
    token_info = sp_oauth.get_access_token(code)

    session["token_info"] = token_info

    return redirect(url_for("redirectYT", _external=True))

@app.route("/redirectYT")
def redirectYT():
    # Disable OAuthlib's HTTPS verification when running locally.
    # *DO NOT* leave this option enabled in production.
    os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"

   # Create OAuth flow object
    flow = Flow.from_client_secrets_file(
        'client_secrets.json',
        scopes=["https://www.googleapis.com/auth/youtube.force-ssl"])
    flow.redirect_uri = url_for('callback', _external=True)
    authorization_url, state = flow.authorization_url(
        access_type='offline',
        prompt='select_account')
    
    # Save the state so we can verify the request later
    session['state'] = state
    
    return redirect(authorization_url)

@app.route('/callback')
def callback():
    # Verify the request state
    if request.args.get('state') != session['state']:
        raise Exception('Invalid state')
    
    # Create the OAUth flow object
    flow = InstalledAppFlow.from_client_secrets_file(
        'client_secrets.json',
        scopes=["https://www.googleapis.com/auth/youtube.force-ssl"],
        state=session['state'])
    flow.redirect_uri = url_for('callback', _external=True)

    # Exchange the authorization code for an access token
    authorization_response = request.url
    flow.fetch_token(authorization_response=authorization_response)

    # Save credentials to the session
    credentials = flow.credentials
    session['credentials'] = credentials.to_json()

    return redirect(url_for('getPlaylists'))

@app.route('/getPlaylists')
def getPlaylists():
    session['token_info'], authorized = get_token(session)
    session.modified = True
    if not authorized:
        print("User not authorized!")
        return redirect('/')
    credentialscheck()

    sp = spotipy.Spotify(auth = session.get('token_info').get('access_token'))
    user = sp.current_user()
    username = user['display_name']
    print(f"User: {username}") # Debug
    
    # Getting all playlists (since the current_user_playlists max limit is 50, we need a 'for' loop)
    allPlaylists = []
    i = 0
    while True:
        fiftyplaylists = sp.current_user_playlists(limit=50, offset=i * 50)['items']
        i += 1
        allPlaylists += fiftyplaylists
        if (len(fiftyplaylists)< 50):
            break

    # Filtering the data we actually need in each playlist and sorting them alphabetically)
    playlists = [{'name': playlist['name'], 'id': playlist['id']} for playlist in allPlaylists]
    playlists.sort(key=lambda x: x['name'])

    return render_template("getPlaylists.html", playlists=playlists, username=username)

def get_token(session):
    token_valid = False
    token_info = session.get("token_info", None)

    if not (session.get('token_info', False)):
        token_valid = False
        return token_info, token_valid
    
    now = int(time.time())
    is_token_expired = session.get('token_info').get('expires_at') - now < 60

    if (is_token_expired):
        sp_oauth = create_spotify_oauth()
        token_info = sp_oauth.refresh_access_token(session.get('token_info').get('refresh_token'))
    
    token_valid = True
    return token_info, token_valid
python flask spotify spotipy flask-session
1个回答
0
投票

我用spotipy 库中的 Flask Session 的缓存处理程序解决了这个问题。我刚刚声明它

cache_handler = spotipy.cache_handler.FlaskSessionCacheHandler(session)
并添加到
create_spotify_oauth()
。我将链接我在spotipy github 中看到的示例以更好地说明:spotipy 示例

无论如何,我的代码是:

import spotipy
from spotipy.oauth2 import SpotifyOAuth

from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow, Flow
from google.auth.transport.requests import Request
import google.oauth2.credentials

import json
import os
import os.path
import time

from flask import Flask, request, url_for, session, redirect, render_template
from flask_session import Session

import clientinfo # File containing all the info necessary to login the web APP to the Spotify and youtube APIs

""" requirements [
    Flask,
    Spotipy,
    Spotify OAuth,
    Google/Youtube OAuth,
    OS,
    time,
    clientinfo
]"""

app = Flask(__name__)
if __name__ == "__main__":
    app.run(debug=True)

app.secret_key = os.environ['FLASK_S_KEY']

app.config['SESSION_COOKIE_NAME'] = 'Spotify Cookie'

# Configure session to use filesystem (instead of signed cookies)
app.config["SESSION_PERMANENT"] = False
app.config["SESSION_TYPE"] = "filesystem"
app.config['SESSION_FILE_DIR'] = './.flask_session/'
Session(app)


@app.after_request
def after_request(response):
    """Ensure responses aren't cached"""
    response.headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
    response.headers["Expires"] = 0
    response.headers["Pragma"] = "no-cache"
    return response


cache_handler = spotipy.cache_handler.FlaskSessionCacheHandler(session)

def create_spotify_oauth():
    return SpotifyOAuth(
        client_id = os.environ['SPOTIFY_CLIENT_ID'],
        client_secret = os.environ['SPOTIFY_CLIENT_SECRET'],
        redirect_uri = url_for('redirectSite', _external=True),
        scope = "user-read-private playlist-read-private playlist-read-collaborative",
        cache_handler=cache_handler,
        show_dialog=True
    )

def credentialscheck():
    # Disable OAuthlib's HTTPS verification when running locally.
    # *DO NOT* leave this option enabled in production.
    os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
    
    json_credentials = session.get('credentials')
    
    if 'credentials' in session:
        dict_credentials = json.loads(json_credentials)
        credentials = google.oauth2.credentials.Credentials.from_authorized_user_info(dict_credentials)
        

        if credentials.expired:
            credentials.refresh(Request())

    else:
        return redirect(url_for('redirectYT'))

# Endpoint Index
@app.route('/', methods=["GET"])
def index():
    if request.method == "GET":
        return render_template("index.html")
    

# Log in user to the web app using Spotify OAuth
@app.route("/login")
def login():
     # Forget any user_id
    session.clear()

    sp_oauth = create_spotify_oauth()
    auth_url = sp_oauth.get_authorize_url()
    return redirect(auth_url)

@app.route("/logout")
def logout():
    session.clear()
    return redirect(url_for("index", _external=True))

# After login, redirect user to the page with the correct authorization token
@app.route("/redirect")
def redirectSite():
    sp_oauth = create_spotify_oauth()
    session.clear()
    code = request.args.get("code")
    token_info = sp_oauth.get_access_token(code)

    session["token_info"] = token_info

    return redirect(url_for("redirectYT", _external=True))

@app.route("/redirectYT")
def redirectYT():
    # Disable OAuthlib's HTTPS verification when running locally.
    # *DO NOT* leave this option enabled in production.
    os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"

   # Create OAuth flow object
    flow = Flow.from_client_secrets_file(
        'client_secrets.json',
        scopes=["https://www.googleapis.com/auth/youtube.force-ssl"])
    flow.redirect_uri = url_for('callback', _external=True)
    authorization_url, state = flow.authorization_url(
        access_type='offline',
        prompt='select_account')
    
    # Save the state so we can verify the request later
    session['state'] = state
    
    return redirect(authorization_url)

@app.route('/callback')
def callback():
    # Verify the request state
    if request.args.get('state') != session['state']:
        raise Exception('Invalid state')
    
    # Create the OAUth flow object
    flow = InstalledAppFlow.from_client_secrets_file(
        'client_secrets.json',
        scopes=["https://www.googleapis.com/auth/youtube.force-ssl"],
        state=session['state'])
    flow.redirect_uri = url_for('callback', _external=True)

    # Exchange the authorization code for an access token
    authorization_response = request.url
    flow.fetch_token(authorization_response=authorization_response)

    # Save credentials to the session
    credentials = flow.credentials
    session['credentials'] = credentials.to_json()

    return redirect(url_for('getPlaylists'))

@app.route('/getPlaylists')
def getPlaylists():
    session['token_info'], authorized = get_token(session)
    session.modified = True
    if not authorized:
        print("User not authorized!")
        return redirect('/')
    credentialscheck()

    sp = spotipy.Spotify(auth = session.get('token_info').get('access_token'))
    user = sp.current_user()
    username = user['display_name']
    
    # Getting all playlists (since the current_user_playlists max limit is 50, we need a 'for' loop)
    allPlaylists = []
    i = 0
    while True:
        fiftyplaylists = sp.current_user_playlists(limit=50, offset=i * 50)['items']
        i += 1
        allPlaylists += fiftyplaylists
        if (len(fiftyplaylists)< 50):
            break

    # Filtering the data we actually need in each playlist and sorting them alphabetically)
    playlists = [{'name': playlist['name'], 'id': playlist['id']} for playlist in allPlaylists]
    playlists.sort(key=lambda x: x['name'])

    return render_template("getPlaylists.html", playlists=playlists, username=username)

def get_token(session):
    token_valid = False
    token_info = session.get("token_info", None)

    if not (session.get('token_info', False)):
        token_valid = False
        return token_info, token_valid
    
    now = int(time.time())
    is_token_expired = session.get('token_info').get('expires_at') - now < 60

    if (is_token_expired):
        sp_oauth = create_spotify_oauth()
        token_info = sp_oauth.refresh_access_token(session.get('token_info').get('refresh_token'))
    
    token_valid = True
    return token_info, token_valid
© www.soinside.com 2019 - 2024. All rights reserved.