我正在尝试使用 clientid-clientsecret 设置 FastAPI (0.71.0) 身份验证。
我配置了
OAuth2AuthorizationCodeBearer
,显然从 swagger (/docs
) 端点来看,它看起来很好,它要求提供 client-id 和 client-secret 进行身份验证。
auth = OAuth2PasswordBearer(
authorizationUrl=AUTH_URL,
tokenUrl=TOKEN_URL,
)
agent = FastAPI(
description=_DESCRIPTION,
version=VERSION,
dependencies=[Depends(auth)],
middleware=middlewares,
root_path=os.getenv('BASEPATH', '/'),
swagger_ui_init_oauth={
'usePkceWithAuthorizationCodeGrant': True,
'scopes': 'openid profile email'
}
)
但是直接调用 API,我可以使用任何 Bearer 进行访问,例如:
import requests
url = 'http://localhost:8080/test'
headers = {'Authorization': 'Bearer BADTOKEN', 'Content-Type': 'application/json', 'Accept': 'application/json'}
response = requests.get(url=url, params={}, headers=headers)
和
response.ok = True
,所以我可能在FastAPI设置中遗漏了一些东西,但看不到在哪里。
这是正确的身份验证流程吗?
PS:我想要实现的是服务与服务之间的通信,因此普通用户无法访问 API
OAuth2AuthorizationCodeBearer
实现之后,我最终添加了一个方法来检查不记名令牌的有效性:
public_key = requests.get(ISSUER_URL).json().get('public_key')
key = '-----BEGIN PUBLIC KEY-----\n' + public_key + '\n-----END PUBLIC KEY-----'
oauth = OAuth2AuthorizationCodeBearer(
authorizationUrl=AUTH_URL,
tokenUrl=TOKEN_URL,
)
async def auth(token: str | None = Depends(oauth)):
try:
jwt.decode(
token,
key=key,
options={
"verify_signature": True,
"verify_aud": False,
"verify_iss": ISSUER_URL
}
)
except JOSEError as e: # catches any exception
raise HTTPException(
status_code=401,
detail=str(e))
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2AuthorizationCodeBearer
import uvicorn
from jose import JOSEError,jwt
import requests
ISSUER_URL="http://localhost:8083/realms/realmname"
app = FastAPI()
oauth_2_scheme = OAuth2AuthorizationCodeBearer(
tokenUrl="http://localhost:8083/realms/realmname/protocol/openid-connect/token",
authorizationUrl="http://localhost:8083/realms/realmname/protocol/openid-connect/auth")
public_key = requests.get(ISSUER_URL).json().get('public_key')
key = '-----BEGIN PUBLIC KEY-----\n' + public_key + '\n-----END PUBLIC KEY-----'
def valid_access_token(token: str | None = Depends(oauth_2_scheme)):
try:
jwt.decode(
token,
key=key,
options={
"verify_signature": True,
"verify_aud": False,
"verify_iss": ISSUER_URL
}
)
except JOSEError as e: # catches any exception
raise HTTPException(
status_code=401,
detail=str(e))
@app.get("/public")
def get_public():
return {"message": "This endpoint is public"}
@app.get("/private", dependencies=[Depends(valid_access_token)])
def get_private():
return {"message": "This endpoint is private"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8034)