我正在尝试在我的 FastAPI 应用程序中使用 keycloak 我的代码
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from keycloak import KeycloakOpenID
import requests
import logging
import os
from .config import settings
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Keycloak configuration
KEYCLOAK_SERVER_URL = settings.KEYCLOAK_SERVER_URL
KEYCLOAK_REALM = settings.KEYCLOAK_REALM
KEYCLOAK_CLIENT_ID = settings.KEYCLOAK_CLIENT_ID
KEYCLOAK_CLIENT_SECRET = settings.KEYCLOAK_CLIENT_SECRET
ALGORITHM = "RS256"
TOKEN_URL = f"{KEYCLOAK_SERVER_URL}/realms/fastapi-realm/protocol/openid-connect/token"
# Initialize KeycloakOpenID
keycloak_openid = KeycloakOpenID(
server_url=f"{KEYCLOAK_SERVER_URL}",
client_id=KEYCLOAK_CLIENT_ID,
realm_name=KEYCLOAK_REALM,
client_secret_key=KEYCLOAK_CLIENT_SECRET,
verify=False
)
config_well_known = keycloak_openid.well_known()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def verify_token(token: str = Depends(oauth2_scheme)):
try:
decoded_token = keycloak_openid.decode_token(
token,validate=True,
)
username = decoded_token['preferred_username']
logger.info(f"Decoded token: {decoded_token}")
# Verify the issuer claim
issuer = decoded_token["iss"]
expected_issuer = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}"
I# Token example -- token: {'exp': 1731303036, 'iat': 1731267036, 'jti': 'f1b71d25-4de6-4c03-b5f5-d9726b39d51f', 'iss': 'https://feast-keycloak.pimc-st.innodev.local/realms/feast-realm', 'aud': 'account', 'sub': 'ac48f45e-f26b-4380-bde8-e752febb6d18', 'typ': 'Bearer', 'azp': 'feast-client-id', 'session_state': 'b36cd197-247d-447e-9f3d-6cf1fecae7d6', 'acr': '1', 'allowed-origins': ['https://feast-frontend.pimc-st.innodev.local', '/*', 'http://localhost:5173'], 'realm_access': {'roles': ['default-roles-feast-realm', 'offline_access', 'uma_authorization']}, 'resource_access': {'account': {'roles': ['manage-account', 'manage-account-links', 'view-profile']}}, 'scope': 'profile email', 'sid': 'b36cd197-247d-447e-9f3d-6cf1fecae7d6', 'email_verified': False, 'name': 'A B', 'preferred_username': 'my_username', 'given_name': 'A', 'family_name': 'B', 'email': '[email protected]'}
logger.info(f"XXX_ issuer={issuer}")
logger.info(f"XXX_ expected_issuer={expected_issuer}")
if issuer != expected_issuer:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid issuer")
logger.info(f"username: {username}")
return decoded_token
except Exception as e:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
在我的 main.py 中,我有以下代码
from fastapi import Depends, FastAPI, HTTPException, status, Security
from .keycloak import verify_token, oauth2_scheme, KEYCLOAK_CLIENT_ID, KEYCLOAK_CLIENT_SECRET, TOKEN_URL
app = FastAPI()
@app.post("/project", response_model=schemas.Project, tags=["project",])
def create_project(project: schemas.CreateProject, db: Session = Depends(get_db), payload: dict = Security(verify_token)):
...
@app.post("/login")
def get_token(body: schemas.Login):
data = {
"grant_type": "password", # TODO: clarify grant_type client_credentials (requires only client id and secret or password - requires password and login)
"client_id": KEYCLOAK_CLIENT_ID,
"client_secret": KEYCLOAK_CLIENT_SECRET,
"password": body.password,
"username": body.login
}
response = requests.post(TOKEN_URL, data=data, verify=False)
return JSONResponse(status_code=response.status_code, content=response.json())
我正在通过 /login 方法获取令牌 然后我像这样应用令牌:
请求
/project
我有一个错误:
“无效令牌”
如何修复错误?
这3步即可验证token
Step 1
:从Keycloak中获取公钥证书来验证
JWT 令牌签名。
Step 2
:将证书转换为可用的公钥格式。
Step 3
:使用公钥解码并验证 JWT 令牌签名,同时跳过受众和发行者检查。
# Step 1: Get public key for signature verification
certs_url = f"{self.keycloak_url}/realms/{self.realm}/protocol/openid-connect/certs"
certs_response = requests.get(certs_url)
certs_response.raise_for_status()
public_key_data = certs_response.json()
certificate_pem = f"-----BEGIN CERTIFICATE-----\n{public_key_data['keys'][0]['x5c'][0]}\n-----END CERTIFICATE-----"
# Step 2: Convert certificate to public key
cert = x509.load_pem_x509_certificate(certificate_pem.encode('utf-8'), default_backend())
public_key = cert.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
# Step 3: Use jwt.decode only for signature verification, disabling audience and issuer checks
jwt.decode(
token,
public_key,
algorithms=['RS256'],
options={
'verify_aud': False,
'verify_iss': False
}
)
version: '3.8'
services:
postgres:
image: postgres:15.6
container_name: postgres_db
restart: always
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
environment:
POSTGRES_DB: keycloak
POSTGRES_USER: keycloak
POSTGRES_PASSWORD: password
keycloak_web:
image: quay.io/keycloak/keycloak:26.0.5
container_name: keycloak_web
environment:
KC_DB: postgres
KC_DB_URL: jdbc:postgresql://postgres:5432/keycloak
KC_DB_USERNAME: keycloak
KC_DB_PASSWORD: password
KC_HOSTNAME: localhost
KC_HOSTNAME_STRICT: false
KC_HOSTNAME_STRICT_HTTPS: false
KC_LOG_LEVEL: info
KC_METRICS_ENABLED: true
KC_HEALTH_ENABLED: true
KEYCLOAK_ADMIN: admin
KEYCLOAK_ADMIN_PASSWORD: admin
command: start-dev
depends_on:
- postgres
ports:
- 8080:8080
volumes:
postgres_data:
realm
fastapi-realm
user
username: user1
password: 1234
pip install fastapi uvicorn requests python-keycloak python-jose cryptography
api-server.py
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from starlette.responses import JSONResponse
from keycloak import KeycloakOpenID
from jose import jwt, JWTError
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import requests
import json
app = FastAPI()
# Configuration variables
KEYCLOAK_URL = 'http://localhost:8080'
REALM = 'fastapi-realm'
TOKEN_URL = f"{KEYCLOAK_URL}/realms/{REALM}/protocol/openid-connect/token"
KEYCLOAK_CLIENT_ID = 'admin-cli'
# Keycloak service class
class KeycloakService:
def __init__(self):
self.keycloak_url = KEYCLOAK_URL
self.realm = REALM
def decode_and_print_token(self, token):
try:
# Retrieve claims without verifying the signature or audience
decoded_token = jwt.get_unverified_claims(token)
print("Decoded JWT token:")
print(json.dumps(decoded_token, indent=4))
return decoded_token
except JWTError as e:
print(f"Error decoding token: {e}")
return None
def validate_token(self, token: str, client_id: str) -> bool:
decoded_token = self.decode_and_print_token(token)
if not decoded_token:
print("Failed to decode token.")
return False
try:
# Extract issuer, azp, and aud directly from decoded token
issuer = decoded_token.get('iss')
authorized_party = decoded_token.get('azp')
audience = decoded_token.get('aud')
print("Issuer (iss) in token:", issuer)
print("Authorized party (azp) in token:", authorized_party)
print("Audience (aud) in token:", audience)
expected_issuer = f"{self.keycloak_url}/realms/{self.realm}"
if issuer != expected_issuer:
print(f"Invalid issuer. Expected: {expected_issuer}, Got: {issuer}")
return False
# Allow `aud` to be `None` for user tokens, or match "account" or the client_id for client tokens
if audience is None:
print("Audience is None, assuming this is a user token without an audience.")
elif audience not in ["account", client_id]:
print(f"Invalid audience. Expected: 'account' or '{client_id}', Got: {audience}")
return False
# Step 1: Get public key for signature verification
certs_url = f"{self.keycloak_url}/realms/{self.realm}/protocol/openid-connect/certs"
certs_response = requests.get(certs_url)
certs_response.raise_for_status()
public_key_data = certs_response.json()
certificate_pem = f"-----BEGIN CERTIFICATE-----\n{public_key_data['keys'][0]['x5c'][0]}\n-----END CERTIFICATE-----"
# Step 2: Convert certificate to public key
cert = x509.load_pem_x509_certificate(certificate_pem.encode('utf-8'), default_backend())
public_key = cert.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
# Step 3: Use jwt.decode only for signature verification, disabling audience and issuer checks
jwt.decode(
token,
public_key,
algorithms=['RS256'],
options={
'verify_aud': False,
'verify_iss': False
}
)
print("Token signature is valid.")
return True
except JWTError as e:
print(f"Error validating token: {e}")
return False
except requests.RequestException as e:
print(f"Error fetching public key: {e}")
return False
keycloak_service = KeycloakService()
# Schemas
class Login(BaseModel):
login: str
password: str
class VerifyToken(BaseModel):
token: str
client_id: str
# Login endpoint
@app.post("/login")
def get_token(body: Login):
data = {
"grant_type": "password",
"client_id": KEYCLOAK_CLIENT_ID,
"username": body.login,
"password": body.password
}
response = requests.post(TOKEN_URL, data=data, verify=False)
if response.status_code == 200:
return JSONResponse(status_code=200, content=response.json())
else:
raise HTTPException(status_code=response.status_code, detail=response.json())
# Verify endpoint
@app.post("/verify")
def verify_token(body: VerifyToken):
is_valid = keycloak_service.validate_token(body.token, body.client_id)
return {"valid": is_valid}
uvicorn api-server:app --reload
API document URL
http://localhost:8000/docs#/
URL
POST http://localhost:8000/login
Input Body
{
"login": "user1",
"password": "1234"
}
Script
用于分配 user_token 变量
var jsonData = JSON.parse(responseBody);
if (jsonData.access_token) {
postman.setEnvironmentVariable("user_token", jsonData.access_token);
} else {
console.error("access_token not found in the response");
}
URL
POST http://localhost:8000/verify
Input Body
{
"token": "{{user_token}}",
"client_id": "admin-cli"
}
因此用户令牌已通过 OK(真)值进行验证。
{
"valid": true
}