我有一个 API 端点(FastAPI / Uvicorn)。除此之外,它还向另一个 API 请求信息。当我使用多个并发请求加载 API 时,我开始收到以下错误:
h11._util.LocalProtocolError: can't handle event type ConnectionClosed when role=SERVER and state=SEND_RESPONSE
在正常环境中,我会利用
request.session
,但我知道它不是完全线程安全的。
因此,在 FastAPI 等框架内使用请求的正确方法是什么,其中多个线程将同时使用
requests
库?
您可以使用
requests
,而不是使用
httpx
,它也提供了 async
API(在执行 httpx
测试时,FastAPI 的文档中也建议使用
async
),以及FastAPI/Starlette 最近 将 TestClient
上的 HTTP 客户端从 requests
替换为 httpx
)。
httpx
文档中给出的示例,演示如何使用该库发出异步 HTTP(s) 请求,然后将响应流式传输回客户端。您可以使用 httpx.AsyncClient()
来代替 requests.Session()
,这在向同一主机发出多个请求时非常有用,因为底层 TCP 连接将被重用,而不是为每个请求重新创建一个连接 - 因此,从而显着提高性能。此外,它还允许您重复使用 headers
和其他设置(例如 proxies
和 timeout
),以及跨请求保留 cookies
。您生成一个 Client
并在每次需要时重复使用它。完成后,您可以使用 await client.aclose()
显式关闭客户端(您可以在 shutdown
事件处理程序中执行此操作)。示例和更多详细信息也可以在这个答案中找到。
from fastapi import FastAPI
from starlette.background import BackgroundTask
from fastapi.responses import StreamingResponse
import httpx
app = FastAPI()
@app.on_event("startup")
async def startup_event():
app.state.client = httpx.AsyncClient()
@app.on_event('shutdown')
async def shutdown_event():
await app.state.client.aclose()
@app.get('/')
async def home():
client = app.state.client
req = client.build_request('GET', 'https://www.example.com/')
r = await client.send(req, stream=True)
return StreamingResponse(r.aiter_raw(), background=BackgroundTask(r.aclose))
startup
和 shutdown
现已被弃用(将来可能会完全删除),您可以使用 lifespan
处理程序 来初始化 httpx
客户端,以及关闭关闭时的客户端实例,类似于这个答案中演示的内容。 Starlette 在其文档页面中专门提供了一个使用 lifespan
处理程序和 httpx
客户端的示例。如 Starlette 的文档中所述:
有lifespan
的概念,这是一本字典, 可用于在生命周期和生命周期之间共享对象 请求。state
请求中收到的
是状态的浅表副本 在寿命处理程序上收到。state
因此,可以使用
request.state
在端点内部访问添加到生命周期处理程序中状态的对象。下面的示例使用流响应来与外部服务器通信,并将响应发送回客户端。有关 async
的 httpx
响应流方法(即 aiter_bytes()
、aiter_text()
、aiter_lines()
等)的更多详细信息,请参阅 here。
如果您想使用
media_type
作为 StreamingResponse
,您可以使用原始响应中的一个,如下所示:media_type=r.headers['content-type']
。但是,如这个答案中所述,您需要确保media_type
未设置为text/plain
;否则,内容将不会按预期在浏览器中传输,除非您禁用 MIME 嗅探(查看链接的答案以获取更多详细信息和解决方案)。
from fastapi import FastAPI, Request
from contextlib import asynccontextmanager
from fastapi.responses import StreamingResponse
from starlette.background import BackgroundTask
import httpx
@asynccontextmanager
async def lifespan(app: FastAPI):
# Initialise the Client on startup and add it to the state
async with httpx.AsyncClient() as client:
yield {'client': client}
# The Client closes on shutdown
app = FastAPI(lifespan=lifespan)
@app.get('/')
async def home(request: Request):
client = request.state.client
req = client.build_request('GET', 'https://www.example.com')
r = await client.send(req, stream=True)
return StreamingResponse(r.aiter_raw(), background=BackgroundTask(r.aclose))
如果出于任何原因,您需要在响应客户端之前在服务器端逐块读取内容,您可以按如下方式执行此操作:
@app.get('/')
async def home(request: Request):
client = request.state.client
req = client.build_request('GET', 'https://www.example.com')
r = await client.send(req, stream=True)
async def gen():
async for chunk in r.aiter_raw():
yield chunk
await r.aclose()
return StreamingResponse(gen())
如果您不想使用流式响应,而是首先
httpx
为您读取响应(这会将响应数据存储到服务器的 RAM 中;因此,您应该确保有足够的可用空间来容纳数据),您可以使用以下内容。请注意,使用 r.json()
只适用于响应数据为 JSON 格式的情况;否则,您可以直接返回
PlainTextResponse
或自定义 Response
,如下所示。
from fastapi import Response
from fastapi.responses import PlainTextResponse
@app.get('/')
async def home(request: Request):
client = request.state.client
req = client.build_request('GET', 'https://www.example.com')
r = await client.send(req)
content_type = r.headers.get('content-type')
if content_type == 'application/json':
return r.json()
elif content_type == 'text/plain':
return PlainTextResponse(content=r.text)
else:
return Response(content=r.content)
async
的
httpx
API 意味着您必须使用
async def
定义端点;否则,您将必须使用标准同步API(对于
def
与
async def
,请参阅此答案),并如此github讨论中所述:
是的。您还可以使用
HTTPX
旨在线程安全, 是的,单个 跨所有线程的客户端实例在以下方面会做得更好 连接池,而不是使用每个线程实例。
limits
上的
Client
关键字参数来控制连接池大小(请参阅池限制配置)。例如:
limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
client = httpx.Client(limits=limits)