Iinit.py中的蓝图:
linkedin_bp = make_linkedin_blueprint(
client_id=linkedin_client_id,
client_secret=linkedin_client_secret,
redirect_to='linkedin_authorized', # This should match the name of your callback route
scope=["openid", "profile", "email"], # Add required scopes here
)
app.register_blueprint(linkedin_bp, url_prefix="/login")
这是我Views.py中定义的回调函数的视图。我已经定义了linkedin_redirect_uri,linkedin_client_id和linkedin_client_secret变量,它们在我的LinkedIn开发人员站点上匹配相同。
@app.route("/login/linkedin/authorized")
def linkedin_authorized():
# Step 1: Get 'code' and 'state' from the query parameters
code = request.args.get("code")
state = request.args.get("state")
token_data = {
"grant_type": "authorization_code",
"code": code,
"redirect_uri": linkedin_redirect_uri,
"client_id": linkedin_client_id,
"client_secret": linkedin_client_secret,
}
# Step 2: Exchange 'code' for access token
token_url = "https://www.linkedin.com/oauth/v2/accessToken"
token_response = requests.post(token_url, data=token_data, headers={"Content-Type": "application/x-www-form-urlencoded"})
# Check if the token request was successful
if token_response.status_code != 200:
return jsonify({"error": "Failed to get access token"}), 400
token_json = token_response.json()
access_token = token_json["access_token"]
# Step 3: Fetch user profile using the access token
userinfo_url = "https://api.linkedin.com/v2/userinfo"
profile_response = requests.get(userinfo_url, headers={"Authorization": f"Bearer {access_token}"})
profile_json = profile_response.json()
# Extract required user details from the profile
user_profile = {
"first_name": profile_json.get("given_name"),
"last_name": profile_json.get("family_name"),
"email": profile_json.get("email"),
"linkedin_id": profile_json.get("sub"),
}
linkedin_id = user_profile["linkedin_id"]
email = user_profile["email"]
username = user_profile["first_name"] if user_profile["first_name"] else user_profile["last_name"]
if not linkedin_id:
app.logger.error("LinkedIn ID missing from profile")
# Step 4: Check if the user exists in the database
user = User.query.filter_by(linkedin_id=linkedin_id).first()
# Debugging: Log whether the user exists in the database
if user is None:
# Create a new user if they don't exist
user = User(
email=email,
username=username, # Assuming first name is available
linkedin_id=linkedin_id,
active=True,
)
try:
db.session.add(user)
db.session.commit()
app.logger.debug("New user created and added to the database.")
except Exception as e:
db.session.rollback()
app.logger.error(f"Database error: {e}")
else:
app.logger.debug(f"User with LinkedIn ID {linkedin_id} found.")
# Step 5: Log the user in
login_user(user)
# Step 6: Redirect to the profile page
return redirect(url_for('profile', username=user.username))
这是对我有用的回调函数的视图。希望这对某人有帮助。
LINKEDIN_JWKS_URI = "https://www.linkedin.com/oauth/openid/jwks"
@app.route("/linkedin_authorized")
def linkedin_authorized():
# Step 0: Check if user is authorized via LinkedIn
if not linkedin.authorized:
return redirect(url_for('linkedin.login'))
# Step 1: Retrieve user info from LinkedIn API
try:
resp = linkedin.get("https://api.linkedin.com/v2/userinfo")
if not resp.ok:
return "Failed to fetch user info", 400
user_info = resp.json()
except Exception as e:
return "Failed to fetch user info", 400
# Step 2: Validate the ID Token
token_data = linkedin.token
id_token = token_data.get("id_token")
access_token = token_data.get("access_token")
if not id_token or not access_token:
return "Token missing", 400
# Fetch LinkedIn’s public keys for validation
try:
jwks = requests.get(LINKEDIN_JWKS_URI).json()
public_keys = {key["kid"]: jwt.algorithms.RSAAlgorithm.from_jwk(key) for key in jwks["keys"]}
except requests.exceptions.RequestException as e:
return "Failed to retrieve public keys", 500
try:
# Decode and validate the ID token
header = jwt.get_unverified_header(id_token)
key = public_keys.get(header["kid"])
try:
decoded_token = jwt.decode(id_token, key=key, algorithms=["RS256"], audience=app.config['LINKEDIN_OAUTH_CLIENT_ID'])
except PyJWTError as e:
return "Invalid ID Token", 400
except Exception as e:
return "An error occurred while decoding the ID Token", 500
if decoded_token.get("aud") != app.config['LINKEDIN_OAUTH_CLIENT_ID']:
return "Invalid audience", 400
except jwt.ExpiredSignatureError:
return "ID Token expired", 400
except PyJWTError as e:
return "Invalid ID Token", 400
except Exception as e:
return "An error occurred", 500
# Step 3: Store user info in session
session["linkedin_user"] = {
"linkedin_id": decoded_token["sub"],
"email": user_info.get("email", ""),
"first_name": user_info.get("given_name", ""),
"last_name": user_info.get("family_name", ""),
}
linkedin_id = session["linkedin_user"].get("linkedin_id")
first_name = session["linkedin_user"].get("first_name")
last_name = session["linkedin_user"].get("last_name")
email = session["linkedin_user"].get("email")
if not linkedin_id:
return jsonify({"error": "LinkedIn ID missing from profile"}), 400
# Step 4: Check if the user exists in the database
user = User.query.filter_by(linkedin_id=linkedin_id).first()
if user is None:
# Generate a username based on LinkedIn name or generate a default
username = f"{first_name.capitalize()}{last_name.capitalize()}"
user = User(
email=email,
username=username,
linkedin_id=linkedin_id,
active=True,
)
try:
db.session.add(user)
db.session.commit()
except Exception as e:
db.session.rollback()
return jsonify({"error": "Database error"}), 500
# Step 5: Log the user in
if not login_user(user):
return jsonify({"error": "Login failed"}), 500
# Step 6: Redirect to the profile page
return redirect(url_for('profile', username=user.username))