我正在关注 this 来实现一个快速回调示例。为方便起见,我在同一个应用程序中创建回调目标端点,但启动两个服务以模拟回调目标在不同主机中的情况。
在一个外壳中,
uvicorn app:app --reload --port 8000
,在另一个外壳中uvicorn app:app --reload --port 12345
。现在在第三个shell中,调用curl -X POST -H "Content-Type: application/json" -d '{"name":"John Doe"}' "http://127.0.0.1:8000/process?callback_url=http://127.0.0.1:12345"
虽然
/process
调用有效(我确实看到第一个 shell 中打印了“正在处理输入...”,但似乎从未命中回调端点(第二个 shell 中全部安静)。我在这里错过了什么吗?
from fastapi import APIRouter, FastAPI
from pydantic import BaseModel, HttpUrl
app = FastAPI()
class Input(BaseModel):
name: str
class Output(BaseModel):
message: str
callback_router = APIRouter()
@callback_router.post("{$callback_url}/callback")
def process_notification(body: Output) -> None:
pass
@app.post("/process", response_model=Output, callbacks=callback_router.routes)
def process(input: Input, callback_url: HttpUrl | None) -> Output:
print("Processing input...")
message = f"Hello, {input.name}!"
return Output(message=message)
@app.post("/callback")
def callback(output: Output) -> str:
print("Callback received...")
message = f"it said {output.message}"
return message