我有一个天蓝色函数,当前正在监听事件中心中的主题。
我们收到几个 json 作为事件,我们需要实现一个装饰器来包装在执行过程中实际可能引发的所有可能的异常(瞬态错误、传入负载错误)并引发另一个事件,但保留我收到的第一个消息和异常这是提出来的。
from fastapi import HTTPException
from pydantic import BaseModel
from dam_common.common import settings_helper
from enum import Enum
import json
import httpx
import logging
class EventHubRequest(BaseModel):
IHDRequestID: str
ProductType: str
RequestType: str
RequesterADGroupName: str
TimeStamp: str
class RequestTypeValidation(Enum):
IHD = "IHD"
ANONYMIZED = "Anonymized"
STUDY = "Keycoded"
async def main(eventHubMessage: str):
try:
# Deserialize the JSON payload
data = json.loads(eventHubMessage)
logging.info(f"Processing message: {data}")
if isinstance(data, list):
for record in data:
validate_request(record)
await process_request(record)
else:
validate_request(data)
await process_request(data)
logging.info("All requests processed successfully.")
except json.JSONDecodeError:
logging.error("Invalid JSON format in the request.")
except ValueError as ve:
logging.error(f"Validation error: {ve}")
except Exception as e:
logging.error(f"An error occurred: {e}")
# Validate input fields in the request
def validate_request(data: dict):
required_keys = [
"RequestID",
"ProductType",
"RequestType",
"RequesterADGroupName",
"TimeStamp",
]
for key in required_keys:
if key not in data:
raise ValueError(f"Missing required field: {key}")
async def process_request(data):
logging.info(f"Processing request: {data}")
# Convert the incoming data to EventHubRequest class type
eventhub_request = EventHubRequest(
IHDRequestID=data["RequestID"],
ProductType=data["ProductType"],
RequestType=data["RequestType"],
RequesterADGroupName=data["RequesterADGroupName"],
TimeStamp=data["TimeStamp"],
)
full_request = eventhub_request.dict()
api_url = settings_helper.get_value("API_URL") + "access_manager/send_request_v2"
headers = {
"content-type": "application/json",
"x-functions-key": settings_helper.get_value("MASTER_KEY"),
}
# Make the HTTP call
async with httpx.AsyncClient() as client:
try:
logging.info(f"Sending request to `{api_url}` via httpx.")
response = await client.post(
api_url, json=full_request, headers=headers, timeout=60.0
)
response.raise_for_status()
logging.info(f"Response status: {response.status_code}")
logging.info(f"Response content: {response.text}")
except httpx.HTTPStatusError as exc:
logging.error(f"HTTP error occurred: {exc.response.text}")
raise HTTPException(
status_code=exc.response.status_code,
detail="Failed to send request to access_manager",
)
except Exception as e:
logging.error(f"Unexpected error sending request to access_manager: {e}")
raise HTTPException(status_code=500, detail="Unexpected error occurred.")
这是我的代码,我已经尝试创建一个装饰器,但它丢失了原始消息,我需要保留消息并在引发异常的同时将其发送到装饰器......还带来异常消息。
每次在我的代码中引发异常时,我希望有一个装饰器包装并在 og 消息中获取异常。
您可以使用
raise HTTPException.. from e
并使用自定义异常处理程序:
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
import uvicorn
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI()
@app.exception_handler(HTTPException)
async def unicorn_exception_handler(request: Request, exc: HTTPException):
if exc.__cause__:
logger.exception(exc)
return JSONResponse(
status_code=exc.status_code,
content={"detail": exc.detail},
)
@app.get("/")
def throw_error():
try:
raise ValueError("Whoops")
except ValueError as e:
raise HTTPException(detail="Something went wrong", status_code=500) from e
if __name__ == "__main__":
uvicorn.run(app)
输出:
C:\Users\tom.mclean\AppData\Local\pypoetry\Cache\virtualenvs\nemo-deploy-IXJNSscy-py3.11\Scripts\python.exe C:\Users\tom.mclean\AppData\Roaming\JetBrains\PyCharmCE2023.3\scratches\scratch.py
INFO: Started server process [22652]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO: 127.0.0.1:59028 - "GET / HTTP/1.1" 500 Internal Server Error
ERROR:__main__:500: Something went wrong
Traceback (most recent call last):
File "C:\Users\tom.mclean\AppData\Roaming\JetBrains\PyCharmCE2023.3\scratches\scratch.py", line 24, in throw_error
raise ValueError("Whoops")
ValueError: Whoops
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:\Users\tom.mclean\AppData\Local\pypoetry\Cache\virtualenvs\nemo-deploy-IXJNSscy-py3.11\Lib\site-packages\starlette\_exception_handler.py", line 42, in wrapped_app
await app(scope, receive, sender)
File "C:\Users\tom.mclean\AppData\Local\pypoetry\Cache\virtualenvs\nemo-deploy-IXJNSscy-py3.11\Lib\site-packages\starlette\routing.py", line 73, in app
response = await f(request)
^^^^^^^^^^^^^^^^
File "C:\Users\tom.mclean\AppData\Local\pypoetry\Cache\virtualenvs\nemo-deploy-IXJNSscy-py3.11\Lib\site-packages\fastapi\routing.py", line 301, in app
raw_response = await run_endpoint_function(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\tom.mclean\AppData\Local\pypoetry\Cache\virtualenvs\nemo-deploy-IXJNSscy-py3.11\Lib\site-packages\fastapi\routing.py", line 214, in run_endpoint_function
return await run_in_threadpool(dependant.call, **values)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\tom.mclean\AppData\Local\pypoetry\Cache\virtualenvs\nemo-deploy-IXJNSscy-py3.11\Lib\site-packages\starlette\concurrency.py", line 39, in run_in_threadpool
return await anyio.to_thread.run_sync(func, *args)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\tom.mclean\AppData\Local\pypoetry\Cache\virtualenvs\nemo-deploy-IXJNSscy-py3.11\Lib\site-packages\anyio\to_thread.py", line 56, in run_sync
return await get_async_backend().run_sync_in_worker_thread(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\tom.mclean\AppData\Local\pypoetry\Cache\virtualenvs\nemo-deploy-IXJNSscy-py3.11\Lib\site-packages\anyio\_backends\_asyncio.py", line 2441, in run_sync_in_worker_thread
return await future
^^^^^^^^^^^^
File "C:\Users\tom.mclean\AppData\Local\pypoetry\Cache\virtualenvs\nemo-deploy-IXJNSscy-py3.11\Lib\site-packages\anyio\_backends\_asyncio.py", line 943, in run
result = context.run(func, *args)
^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\tom.mclean\AppData\Roaming\JetBrains\PyCharmCE2023.3\scratches\scratch.py", line 26, in throw_error
raise HTTPException(detail="Something went wrong", status_code=500) from e
fastapi.exceptions.HTTPException: 500: Something went wrong