使用OpenID Connect

问题描述 投票:0回答:0
这是我使用烧瓶舞的方式来定义我的

Iinit.py中的蓝图:

linkedin_bp = make_linkedin_blueprint( client_id=linkedin_client_id, client_secret=linkedin_client_secret, redirect_to='linkedin_authorized', # This should match the name of your callback route scope=["openid", "profile", "email"], # Add required scopes here ) app.register_blueprint(linkedin_bp, url_prefix="/login")

这是我Views.py中定义的回调函数的视图。我已经定义了linkedin_redirect_uri,linkedin_client_id和linkedin_client_secret变量,它们在我的LinkedIn开发人员站点上匹配相同。

@app.route("/login/linkedin/authorized") def linkedin_authorized(): # Step 1: Get 'code' and 'state' from the query parameters code = request.args.get("code") state = request.args.get("state") token_data = { "grant_type": "authorization_code", "code": code, "redirect_uri": linkedin_redirect_uri, "client_id": linkedin_client_id, "client_secret": linkedin_client_secret, } # Step 2: Exchange 'code' for access token token_url = "https://www.linkedin.com/oauth/v2/accessToken" token_response = requests.post(token_url, data=token_data, headers={"Content-Type": "application/x-www-form-urlencoded"}) # Check if the token request was successful if token_response.status_code != 200: return jsonify({"error": "Failed to get access token"}), 400 token_json = token_response.json() access_token = token_json["access_token"] # Step 3: Fetch user profile using the access token userinfo_url = "https://api.linkedin.com/v2/userinfo" profile_response = requests.get(userinfo_url, headers={"Authorization": f"Bearer {access_token}"}) profile_json = profile_response.json() # Extract required user details from the profile user_profile = { "first_name": profile_json.get("given_name"), "last_name": profile_json.get("family_name"), "email": profile_json.get("email"), "linkedin_id": profile_json.get("sub"), } linkedin_id = user_profile["linkedin_id"] email = user_profile["email"] username = user_profile["first_name"] if user_profile["first_name"] else user_profile["last_name"] if not linkedin_id: app.logger.error("LinkedIn ID missing from profile") # Step 4: Check if the user exists in the database user = User.query.filter_by(linkedin_id=linkedin_id).first() # Debugging: Log whether the user exists in the database if user is None: # Create a new user if they don't exist user = User( email=email, username=username, # Assuming first name is available linkedin_id=linkedin_id, active=True, ) try: db.session.add(user) db.session.commit() app.logger.debug("New user created and added to the database.") except Exception as e: db.session.rollback() app.logger.error(f"Database error: {e}") else: app.logger.debug(f"User with LinkedIn ID {linkedin_id} found.") # Step 5: Log the user in login_user(user) # Step 6: Redirect to the profile page return redirect(url_for('profile', username=user.username))

这是对我有用的回调函数的视图。希望这对某人有帮助。
LINKEDIN_JWKS_URI = "https://www.linkedin.com/oauth/openid/jwks" @app.route("/linkedin_authorized") def linkedin_authorized(): # Step 0: Check if user is authorized via LinkedIn if not linkedin.authorized: return redirect(url_for('linkedin.login')) # Step 1: Retrieve user info from LinkedIn API try: resp = linkedin.get("https://api.linkedin.com/v2/userinfo") if not resp.ok: return "Failed to fetch user info", 400 user_info = resp.json() except Exception as e: return "Failed to fetch user info", 400 # Step 2: Validate the ID Token token_data = linkedin.token id_token = token_data.get("id_token") access_token = token_data.get("access_token") if not id_token or not access_token: return "Token missing", 400 # Fetch LinkedIn’s public keys for validation try: jwks = requests.get(LINKEDIN_JWKS_URI).json() public_keys = {key["kid"]: jwt.algorithms.RSAAlgorithm.from_jwk(key) for key in jwks["keys"]} except requests.exceptions.RequestException as e: return "Failed to retrieve public keys", 500 try: # Decode and validate the ID token header = jwt.get_unverified_header(id_token) key = public_keys.get(header["kid"]) try: decoded_token = jwt.decode(id_token, key=key, algorithms=["RS256"], audience=app.config['LINKEDIN_OAUTH_CLIENT_ID']) except PyJWTError as e: return "Invalid ID Token", 400 except Exception as e: return "An error occurred while decoding the ID Token", 500 if decoded_token.get("aud") != app.config['LINKEDIN_OAUTH_CLIENT_ID']: return "Invalid audience", 400 except jwt.ExpiredSignatureError: return "ID Token expired", 400 except PyJWTError as e: return "Invalid ID Token", 400 except Exception as e: return "An error occurred", 500 # Step 3: Store user info in session session["linkedin_user"] = { "linkedin_id": decoded_token["sub"], "email": user_info.get("email", ""), "first_name": user_info.get("given_name", ""), "last_name": user_info.get("family_name", ""), } linkedin_id = session["linkedin_user"].get("linkedin_id") first_name = session["linkedin_user"].get("first_name") last_name = session["linkedin_user"].get("last_name") email = session["linkedin_user"].get("email") if not linkedin_id: return jsonify({"error": "LinkedIn ID missing from profile"}), 400 # Step 4: Check if the user exists in the database user = User.query.filter_by(linkedin_id=linkedin_id).first() if user is None: # Generate a username based on LinkedIn name or generate a default username = f"{first_name.capitalize()}{last_name.capitalize()}" user = User( email=email, username=username, linkedin_id=linkedin_id, active=True, ) try: db.session.add(user) db.session.commit() except Exception as e: db.session.rollback() return jsonify({"error": "Database error"}), 500 # Step 5: Log the user in if not login_user(user): return jsonify({"error": "Login failed"}), 500 # Step 6: Redirect to the profile page return redirect(url_for('profile', username=user.username))


flask oauth-2.0 openid flask-dance
最新问题
© www.soinside.com 2019 - 2025. All rights reserved.