我正在使用 FastAPI 在我的项目中使用 Oauth2 和 Okta 集成身份验证。在 swagger URL 中,身份验证按钮不是对令牌进行身份验证。
同样的事情在 API 级别上起作用,但在主授权按钮上不起作用。 即使我指定了错误的令牌,它也会说“已验证”。
支持此身份验证的代码是:okta_authentication.py
import base64
import requests
from fastapi import HTTPException, Request
from starlette import status
from app.core.runtime_config import runtime_config
from app.services import secretsmanager
def fetch_okta_client_creds_from_sm():
creds = secretsmanager.SecretsManager().retrieve_okta_creds(runtime_config.is_test_env)
return creds
def generate_swagger_description() -> str:
"""
Function to generate the swagger description for the Okta authentication.
Returns:
str: The swagger description.
"""
okta_url = str(runtime_config.okta_url)
okta_client_id = fetch_okta_client_creds_from_sm()[0]
app_name = "data-persistence-infrastructure"
redirect_uri = (
f"{str(runtime_config.redirect_url_prefix)}/data/v1/infrastructure/misc/auth/callback"
)
href = f"{okta_url}/authorize? response_type=token&scope=openid%20email%20profile&state=service_user&nonce={app_name}&client_id={okta_client_id}&redirect_uri={redirect_uri}&sessionToken=&audience={okta_client_id}"
desc = (
'<a href="'
+ href
+ '" target="_blank"><b>Click here</b></a> to log in '
+ "to Okta and get your "
"access token.<br/>" + "Copy and paste the token into the box below."
)
return desc
def get_token_from_request(request: Request) -> str:
"""
Function to extract the token from the request.
Parameters:
request (Request): The request object.
Returns:
str: The access token.
"""
auth = request.headers["Authorization"]
token = auth.split(" ")[1]
return token
def fetch_user_details(request: Request) -> dict:
"""
Function to fetch the user details from the Okta user info endpoint.
Parameters:
request (Request): The request object.
Returns:
user_details (dict): User details from token.
"""
auth = request.headers["Authorization"]
headers = {"Authorization": auth}
response = requests.get(runtime_config.user_info_url, headers=headers)
if response.status_code == status.HTTP_200_OK:
return response.json()
else:
raise HTTPException(status_code=401, detail="Invalid User")
def encode_string_to_base64(string: str) -> str:
"""
Function to encode a string to base64.
Parameters:
string (str): The string to encode.
Returns:
str: The base64 encoded string.
"""
string_bytes = string.encode("ascii")
base64_bytes = base64.b64encode(string_bytes)
base64_string = base64_bytes.decode("ascii")
return base64_string
def call_introspect_api(token: str) -> bool:
"""
Function to call the Okta introspect API to validate the access token.
Parameters:
token (str): The access token.
Returns:
bool: True if the token is valid, False otherwise.
"""
okta_creds = fetch_okta_client_creds_from_sm()
base_64_code = encode_string_to_base64(":".join([okta_creds[0], okta_creds[1]]))
headers = {"Authorization": f"Basic {base_64_code}"}
data = {"token": token, "token_type_hint": "id_token"}
response = requests.post(runtime_config.okta_introspect_url, headers=headers, data=data)
if response.status_code == 200:
return response.json()["active"]
else:
return False
def verify_access_token(request: Request) -> str:
"""
Function to verify the access token and return the user email.
Parameters:
request (Request): The request object.
Returns:
str: The user email.
"""
# Extract the token from the request
token = get_token_from_request(request)
user_info = fetch_user_details(request)
email = user_info["email"]
resp = call_introspect_api(token)
if not resp:
raise HTTPException(
status_code=401, detail="Invalid Request. Please Re-authenticate before proceeding"
)
return email
我在中间件中调用它:
import logging
from fastapi import Request
from fastapi.security import HTTPBearer
from starlette.middleware.base import BaseHTTPMiddleware
from app.common.okta_authentication import generate_swagger_description, verify_access_token
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
security = HTTPBearer(
scheme_name="Okta Authentication", description=generate_swagger_description()
)
class AuthMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
# Check if the request path requires authentication
path_prefixes = ["/data/v1/infrastructure/clusters", "/data/v2/infrastructure"]
auth_required = False
for path in path_prefixes:
if request.url.path.startswith(path):
auth_required = True
break
if not auth_required:
return await call_next(request)
# Manually invoke the dependency, so that the request is processed by the security dependency.
await security(request)
# Extract the user email from the access token
user_email = verify_access_token(request)
request.state.user_email = user_email
return await call_next(request)
我还在路由器的 init.py 文件中调用描述方法:(此处是相同的片段)
security = HTTPBearer(scheme_name="Okta Authentication", description=generate_swagger_description())
api_router_without_auth = APIRouter()
api_router = APIRouter(dependencies=[Depends(security)])
我在这里缺少什么?任何帮助将不胜感激.. 蒂亚
即使我指定了错误的令牌,它也会说“已验证”。
这是设计使然,但我同意这个 UI 可能会令人困惑。
Swagger UI 的“可用授权”对话框只是您输入身份验证令牌以在后续 API 调用中使用的地方。此对话框不验证后端的令牌。这个想法是能够输入有效和无效的令牌来测试积极和消极的场景。