在 Uvicorn/FastAPI 内发出下游 HTTP 请求的正确方法是什么?

问题描述 投票:0回答:1

我有一个 API 端点(FastAPI / Uvicorn)。除此之外,它还向另一个 API 请求信息。当我使用多个并发请求加载 API 时,我开始收到以下错误:

h11._util.LocalProtocolError: can't handle event type ConnectionClosed when role=SERVER and state=SEND_RESPONSE

在正常环境中,我会利用

request.session
,但我知道它不是完全线程安全的。

因此,在 FastAPI 等框架内使用请求的正确方法是什么,其中多个线程将同时使用

requests
库?

python python-requests fastapi starlette httpx
1个回答
13
投票

您可以使用

requests
,而不是使用 
httpx
,它也提供了
async
API
(在执行 httpx
 测试时,FastAPI 的文档
中也建议使用
async
),以及FastAPI/Starlette 最近
TestClient
上的 HTTP 客户端从
requests
替换为
httpx
)。

下面的示例基于

httpx
文档中给出的示例,演示如何使用该库发出异步 HTTP(s) 请求,然后将响应流式传输回客户端。您可以使用
httpx.AsyncClient()
来代替
requests.Session()
,这在向同一主机发出多个请求时非常有用,因为底层 TCP 连接将被重用,而不是为每个请求重新创建一个连接 - 因此,从而显着提高性能。此外,它还允许您重复使用
headers
和其他设置(例如
proxies
timeout
),以及跨请求保留
cookies
。您生成一个
Client
并在每次需要时重复使用它。完成后,您可以使用
await client.aclose()
显式关闭客户端(您可以在
shutdown
事件
处理程序中执行此操作)。示例和更多详细信息也可以在这个答案中找到。

示例

from fastapi import FastAPI
from starlette.background import BackgroundTask
from fastapi.responses import StreamingResponse
import httpx


app = FastAPI()


@app.on_event("startup")
async def startup_event():
    app.state.client = httpx.AsyncClient()


@app.on_event('shutdown')
async def shutdown_event():
    await app.state.client.aclose()


@app.get('/')
async def home():
    client = app.state.client
    req = client.build_request('GET', 'https://www.example.com/')
    r = await client.send(req, stream=True)
    return StreamingResponse(r.aiter_raw(), background=BackgroundTask(r.aclose))

示例(已更新)

由于

startup
shutdown
现已被弃用
(将来可能会完全删除),您可以使用
lifespan
处理程序
来初始化
httpx
客户端,以及关闭关闭时的客户端实例,类似于这个答案中演示的内容。 Starlette 在其文档页面中专门提供了一个使用
lifespan
处理程序和
httpx
客户端的示例。如 Starlette 的文档中所述:

lifespan
state
的概念,这是一本字典, 可用于在生命周期和生命周期之间共享对象 请求。

请求中收到的

state
是状态的浅表副本 在寿命处理程序上收到。

因此,可以使用

request.state
在端点内部访问添加到生命周期处理程序中状态的对象。下面的示例使用流响应来与外部服务器通信,并将响应发送回客户端。有关 async
httpx
响应流方法(即
aiter_bytes()
aiter_text()
aiter_lines()
等)的更多详细信息,请参阅
here

如果您想使用

media_type
作为
StreamingResponse
,您可以使用原始响应中的一个,如下所示:
media_type=r.headers['content-type']
。但是,如这个答案中所述,您需要确保
media_type
未设置为
text/plain
;否则,内容将不会按预期在浏览器中传输,除非您禁用 MIME 嗅探(查看链接的答案以获取更多详细信息和解决方案)。

from fastapi import FastAPI, Request
from contextlib import asynccontextmanager
from fastapi.responses import StreamingResponse
from starlette.background import BackgroundTask
import httpx


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Initialise the Client on startup and add it to the state
    async with httpx.AsyncClient() as client:
        yield {'client': client}
        # The Client closes on shutdown 


app = FastAPI(lifespan=lifespan)


@app.get('/')
async def home(request: Request):
    client = request.state.client
    req = client.build_request('GET', 'https://www.example.com')
    r = await client.send(req, stream=True)
    return StreamingResponse(r.aiter_raw(), background=BackgroundTask(r.aclose)) 

如果出于任何原因,您需要在响应客户端之前在服务器端逐块读取内容,您可以按如下方式执行此操作:

@app.get('/')
async def home(request: Request):
    client = request.state.client
    req = client.build_request('GET', 'https://www.example.com')
    r = await client.send(req, stream=True)
    
    async def gen():
        async for chunk in r.aiter_raw():
            yield chunk
        await r.aclose()
        
    return StreamingResponse(gen())

如果您不想使用流式响应,而是首先

httpx
为您读取响应(这会将响应数据存储到服务器的 RAM 中;因此,您应该确保有足够的可用空间来容纳数据),您可以使用以下内容。请注意,使用 r.json()
 只适用于响应数据为 JSON 格式的情况;否则,您可以直接返回 
PlainTextResponse
 或自定义 
Response
,如下所示。

from fastapi import Response from fastapi.responses import PlainTextResponse @app.get('/') async def home(request: Request): client = request.state.client req = client.build_request('GET', 'https://www.example.com') r = await client.send(req) content_type = r.headers.get('content-type') if content_type == 'application/json': return r.json() elif content_type == 'text/plain': return PlainTextResponse(content=r.text) else: return Response(content=r.content)


使用

async

httpx
 API 意味着您必须使用 
async def
 定义端点;否则,您将必须使用
标准同步API(对于def
async def
,请参阅
此答案),并如此github讨论中所述:

是的。

HTTPX

 
旨在线程安全, 是的,单个 跨所有线程的客户端实例在以下方面会做得更好 连接池,而不是使用每个线程实例。

您还可以使用

limits

 上的 
Client
 关键字参数来控制连接池大小(请参阅
池限制配置)。例如:

limits = httpx.Limits(max_keepalive_connections=5, max_connections=10) client = httpx.Client(limits=limits)
    
© www.soinside.com 2019 - 2024. All rights reserved.