使用 clientid/clientsecret 验证 FastAPI

问题描述 投票:0回答:2

我正在尝试使用 clientid-clientsecret 设置 FastAPI (0.71.0) 身份验证。

我配置了

OAuth2AuthorizationCodeBearer
,显然从 swagger (
/docs
) 端点来看,它看起来很好,它要求提供 client-id 和 client-secret 进行身份验证。

auth = OAuth2PasswordBearer(
    authorizationUrl=AUTH_URL,
    tokenUrl=TOKEN_URL,
)

agent = FastAPI(
    description=_DESCRIPTION,
    version=VERSION,
    dependencies=[Depends(auth)],
    middleware=middlewares,
    root_path=os.getenv('BASEPATH', '/'),
    swagger_ui_init_oauth={
        'usePkceWithAuthorizationCodeGrant': True,
        'scopes': 'openid profile email'
    }
)

但是直接调用 API,我可以使用任何 Bearer 进行访问,例如:

import requests

url = 'http://localhost:8080/test'
headers = {'Authorization': 'Bearer BADTOKEN', 'Content-Type': 'application/json', 'Accept': 'application/json'}
response = requests.get(url=url, params={}, headers=headers)

response.ok = True
,所以我可能在FastAPI设置中遗漏了一些东西,但看不到在哪里。

这是正确的身份验证流程吗?

PS:我想要实现的是服务与服务之间的通信,因此普通用户无法访问 API

keycloak fastapi
2个回答
2
投票

在查看了

OAuth2AuthorizationCodeBearer
实现之后,我最终添加了一个方法来检查不记名令牌的有效性:

public_key = requests.get(ISSUER_URL).json().get('public_key')
key = '-----BEGIN PUBLIC KEY-----\n' + public_key + '\n-----END PUBLIC KEY-----'

oauth = OAuth2AuthorizationCodeBearer(
        authorizationUrl=AUTH_URL,
        tokenUrl=TOKEN_URL,
)
async def auth(token: str | None = Depends(oauth)):
    try:
        jwt.decode(
            token,
            key=key,
            options={
                "verify_signature": True,
                "verify_aud": False,
                "verify_iss": ISSUER_URL
            }
        )
    except JOSEError as e:  # catches any exception
        raise HTTPException(
            status_code=401,
            detail=str(e))

0
投票
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2AuthorizationCodeBearer
import uvicorn
from jose import JOSEError,jwt
import requests

ISSUER_URL="http://localhost:8083/realms/realmname"
app = FastAPI()


oauth_2_scheme = OAuth2AuthorizationCodeBearer(
    tokenUrl="http://localhost:8083/realms/realmname/protocol/openid-connect/token",
    authorizationUrl="http://localhost:8083/realms/realmname/protocol/openid-connect/auth")

public_key = requests.get(ISSUER_URL).json().get('public_key')
key = '-----BEGIN PUBLIC KEY-----\n' + public_key + '\n-----END PUBLIC KEY-----'

def valid_access_token(token: str | None = Depends(oauth_2_scheme)):
    try:
        jwt.decode(
            token,
            key=key,
            options={
                "verify_signature": True,
                "verify_aud": False,
                "verify_iss": ISSUER_URL
            }
        )
    except JOSEError as e:  # catches any exception
        raise HTTPException(
            status_code=401,
            detail=str(e))

@app.get("/public")
def get_public():
    return {"message": "This endpoint is public"}


@app.get("/private", dependencies=[Depends(valid_access_token)])
def get_private():
    return {"message": "This endpoint is private"}


if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8034)
© www.soinside.com 2019 - 2024. All rights reserved.