keycloak 中的“无效令牌”

问题描述 投票:0回答:1

我正在尝试在我的 FastAPI 应用程序中使用 keycloak 我的代码

from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from keycloak import KeycloakOpenID
import requests
import logging
import os

from .config import settings


# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Keycloak configuration
KEYCLOAK_SERVER_URL = settings.KEYCLOAK_SERVER_URL
KEYCLOAK_REALM = settings.KEYCLOAK_REALM
KEYCLOAK_CLIENT_ID = settings.KEYCLOAK_CLIENT_ID
KEYCLOAK_CLIENT_SECRET = settings.KEYCLOAK_CLIENT_SECRET
ALGORITHM = "RS256"
TOKEN_URL = f"{KEYCLOAK_SERVER_URL}/realms/fastapi-realm/protocol/openid-connect/token"


# Initialize KeycloakOpenID
keycloak_openid = KeycloakOpenID(
    server_url=f"{KEYCLOAK_SERVER_URL}",
    client_id=KEYCLOAK_CLIENT_ID,
    realm_name=KEYCLOAK_REALM,
    client_secret_key=KEYCLOAK_CLIENT_SECRET,
    verify=False
)
config_well_known = keycloak_openid.well_known()

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

def verify_token(token: str = Depends(oauth2_scheme)):
    try:
        decoded_token = keycloak_openid.decode_token(
            token,validate=True,
        )
        username = decoded_token['preferred_username']
        logger.info(f"Decoded token: {decoded_token}")
        # Verify the issuer claim
        issuer = decoded_token["iss"]

        expected_issuer = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}"
        I# Token example -- token: {'exp': 1731303036, 'iat': 1731267036, 'jti': 'f1b71d25-4de6-4c03-b5f5-d9726b39d51f', 'iss': 'https://feast-keycloak.pimc-st.innodev.local/realms/feast-realm', 'aud': 'account', 'sub': 'ac48f45e-f26b-4380-bde8-e752febb6d18', 'typ': 'Bearer', 'azp': 'feast-client-id', 'session_state': 'b36cd197-247d-447e-9f3d-6cf1fecae7d6', 'acr': '1', 'allowed-origins': ['https://feast-frontend.pimc-st.innodev.local', '/*', 'http://localhost:5173'], 'realm_access': {'roles': ['default-roles-feast-realm', 'offline_access', 'uma_authorization']}, 'resource_access': {'account': {'roles': ['manage-account', 'manage-account-links', 'view-profile']}}, 'scope': 'profile email', 'sid': 'b36cd197-247d-447e-9f3d-6cf1fecae7d6', 'email_verified': False, 'name': 'A B', 'preferred_username': 'my_username', 'given_name': 'A', 'family_name': 'B', 'email': '[email protected]'}
        logger.info(f"XXX_ issuer={issuer}")
        logger.info(f"XXX_ expected_issuer={expected_issuer}")
        if issuer != expected_issuer:
            raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid issuer")
        
        logger.info(f"username: {username}")
        return decoded_token
    except Exception as e:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")

在我的 main.py 中,我有以下代码

from fastapi import Depends, FastAPI, HTTPException, status, Security


from .keycloak import verify_token, oauth2_scheme, KEYCLOAK_CLIENT_ID, KEYCLOAK_CLIENT_SECRET, TOKEN_URL

app = FastAPI()

@app.post("/project", response_model=schemas.Project, tags=["project",])
def create_project(project: schemas.CreateProject, db: Session = Depends(get_db), payload: dict = Security(verify_token)):
    ...


@app.post("/login")
def get_token(body: schemas.Login):
    data = {
        "grant_type": "password", # TODO: clarify grant_type client_credentials (requires only client id and secret or password - requires password and login)
        "client_id": KEYCLOAK_CLIENT_ID,
        "client_secret": KEYCLOAK_CLIENT_SECRET,
        "password": body.password,
        "username": body.login
    }
    response = requests.post(TOKEN_URL, data=data,  verify=False)
    return JSONResponse(status_code=response.status_code, content=response.json())

我正在通过 /login 方法获取令牌 然后我像这样应用令牌: enter image description here

请求

/project
我有一个错误:

“无效令牌”

如何修复错误?

keycloak fastapi
1个回答
0
投票

这3步即可验证token

Step 1
:从Keycloak中获取公钥证书来验证 JWT 令牌签名。

Step 2
:将证书转换为可用的公钥格式。

Step 3
:使用公钥解码并验证 JWT 令牌签名,同时跳过受众和发行者检查。

    # Step 1: Get public key for signature verification
    certs_url = f"{self.keycloak_url}/realms/{self.realm}/protocol/openid-connect/certs"
    certs_response = requests.get(certs_url)
    certs_response.raise_for_status()
    public_key_data = certs_response.json()
    certificate_pem = f"-----BEGIN CERTIFICATE-----\n{public_key_data['keys'][0]['x5c'][0]}\n-----END CERTIFICATE-----"

    # Step 2: Convert certificate to public key
    cert = x509.load_pem_x509_certificate(certificate_pem.encode('utf-8'), default_backend())
    public_key = cert.public_key().public_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PublicFormat.SubjectPublicKeyInfo
    )

    # Step 3: Use jwt.decode only for signature verification, disabling audience and issuer checks
    jwt.decode(
        token,
        public_key,
        algorithms=['RS256'],
        options={
            'verify_aud': False,
            'verify_iss': False
        }
    )

通过 Docker Compose 启动 Key cloak

version: '3.8'

services:
  postgres:
    image: postgres:15.6
    container_name: postgres_db
    restart: always
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data
    environment:
      POSTGRES_DB: keycloak
      POSTGRES_USER: keycloak
      POSTGRES_PASSWORD: password

  keycloak_web:
    image: quay.io/keycloak/keycloak:26.0.5
    container_name: keycloak_web
    environment:
      KC_DB: postgres
      KC_DB_URL: jdbc:postgresql://postgres:5432/keycloak
      KC_DB_USERNAME: keycloak
      KC_DB_PASSWORD: password

      KC_HOSTNAME: localhost
      KC_HOSTNAME_STRICT: false
      KC_HOSTNAME_STRICT_HTTPS: false

      KC_LOG_LEVEL: info
      KC_METRICS_ENABLED: true
      KC_HEALTH_ENABLED: true
      KEYCLOAK_ADMIN: admin
      KEYCLOAK_ADMIN_PASSWORD: admin
    command: start-dev
    depends_on:
      - postgres
    ports:
      - 8080:8080

volumes:
  postgres_data:

创建领域和用户

realm

fastapi-realm

user

username: user1
password: 1234

enter image description here

使用 conda 在 Python 3.12 中安装依赖项

pip install fastapi uvicorn requests python-keycloak python-jose cryptography

enter image description here

FastAPI 服务器代码

api-server.py

from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from starlette.responses import JSONResponse
from keycloak import KeycloakOpenID
from jose import jwt, JWTError
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization

import requests
import json

app = FastAPI()

# Configuration variables
KEYCLOAK_URL = 'http://localhost:8080'
REALM = 'fastapi-realm'
TOKEN_URL = f"{KEYCLOAK_URL}/realms/{REALM}/protocol/openid-connect/token"
KEYCLOAK_CLIENT_ID = 'admin-cli'

# Keycloak service class
class KeycloakService:
    def __init__(self):
        self.keycloak_url = KEYCLOAK_URL
        self.realm = REALM

    def decode_and_print_token(self, token):
        try:
            # Retrieve claims without verifying the signature or audience
            decoded_token = jwt.get_unverified_claims(token)
            print("Decoded JWT token:")
            print(json.dumps(decoded_token, indent=4))
            return decoded_token
        except JWTError as e:
            print(f"Error decoding token: {e}")
            return None
        
    def validate_token(self, token: str, client_id: str) -> bool:
        decoded_token = self.decode_and_print_token(token)
        if not decoded_token:
            print("Failed to decode token.")
            return False

        try:
            # Extract issuer, azp, and aud directly from decoded token
            issuer = decoded_token.get('iss')
            authorized_party = decoded_token.get('azp')
            audience = decoded_token.get('aud')

            print("Issuer (iss) in token:", issuer)
            print("Authorized party (azp) in token:", authorized_party)
            print("Audience (aud) in token:", audience)

            expected_issuer = f"{self.keycloak_url}/realms/{self.realm}"
            if issuer != expected_issuer:
                print(f"Invalid issuer. Expected: {expected_issuer}, Got: {issuer}")
                return False

            # Allow `aud` to be `None` for user tokens, or match "account" or the client_id for client tokens
            if audience is None:
                print("Audience is None, assuming this is a user token without an audience.")
            elif audience not in ["account", client_id]:
                print(f"Invalid audience. Expected: 'account' or '{client_id}', Got: {audience}")
                return False

            # Step 1: Get public key for signature verification
            certs_url = f"{self.keycloak_url}/realms/{self.realm}/protocol/openid-connect/certs"
            certs_response = requests.get(certs_url)
            certs_response.raise_for_status()
            public_key_data = certs_response.json()
            certificate_pem = f"-----BEGIN CERTIFICATE-----\n{public_key_data['keys'][0]['x5c'][0]}\n-----END CERTIFICATE-----"

            # Step 2: Convert certificate to public key
            cert = x509.load_pem_x509_certificate(certificate_pem.encode('utf-8'), default_backend())
            public_key = cert.public_key().public_bytes(
                encoding=serialization.Encoding.PEM,
                format=serialization.PublicFormat.SubjectPublicKeyInfo
            )

            # Step 3: Use jwt.decode only for signature verification, disabling audience and issuer checks
            jwt.decode(
                token,
                public_key,
                algorithms=['RS256'],
                options={
                    'verify_aud': False,
                    'verify_iss': False
                }
            )
            print("Token signature is valid.")
            return True
        except JWTError as e:
            print(f"Error validating token: {e}")
            return False
        except requests.RequestException as e:
            print(f"Error fetching public key: {e}")
            return False

keycloak_service = KeycloakService()

# Schemas
class Login(BaseModel):
    login: str
    password: str

class VerifyToken(BaseModel):
    token: str
    client_id: str

# Login endpoint
@app.post("/login")
def get_token(body: Login):
    data = {
        "grant_type": "password",
        "client_id": KEYCLOAK_CLIENT_ID,
        "username": body.login,
        "password": body.password
    }
    response = requests.post(TOKEN_URL, data=data, verify=False)
    if response.status_code == 200:
        return JSONResponse(status_code=200, content=response.json())
    else:
        raise HTTPException(status_code=response.status_code, detail=response.json())

# Verify endpoint
@app.post("/verify")
def verify_token(body: VerifyToken):
    is_valid = keycloak_service.validate_token(body.token, body.client_id)
    return {"valid": is_valid}

启动 FastAPI 服务器

uvicorn api-server:app --reload

enter image description here

API document URL

http://localhost:8000/docs#/

enter image description here

通过Postman获取Token

URL

POST http://localhost:8000/login

Input Body

{
  "login": "user1",
  "password": "1234"
}

Script
用于分配 user_token 变量

var jsonData = JSON.parse(responseBody);

if (jsonData.access_token) {
    postman.setEnvironmentVariable("user_token", jsonData.access_token);
} else {
    console.error("access_token not found in the response");
}

enter image description here

enter image description here

enter image description here

通过邮递员验证令牌

URL

POST http://localhost:8000/verify

Input Body

{
  "token": "{{user_token}}",
  "client_id": "admin-cli"
}

enter image description here

因此用户令牌已通过 OK(真)值进行验证。

代币有效结果

{
    "valid": true
}
© www.soinside.com 2019 - 2024. All rights reserved.