最终目标是如果客户端超过超时时间,则在服务器端取消请求。
启动服务器相关代码:
def run_server_loop(
routes: web.RouteTableDef,
shutdown_state: ShutdownState,
logger: Logger,
*,
port: int,
periodic_callback: Callable[[], None] | None = None,
shutdown_callback: Callable[[], None] | None = None,
) -> None:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
_run_server(
routes,
shutdown_state,
logger,
port=port,
periodic_callback=periodic_callback,
)
)
finally:
if shutdown_callback is not None:
shutdown_callback()
logger.info('Server stopped')
flush_logs()
async def _run_server(
routes: web.RouteTableDef,
shutdown_state: ShutdownState,
logger: Logger,
*,
port: int,
periodic_callback: Callable[[], None] | None = None,
) -> None:
try:
app = web.Application()
app.add_routes(routes)
runner = web.AppRunner(
app,
access_log_format=(
'%a %t [%D μs] "%r" %{Content-Length}i %s '
'%b "%{Referer}i" "%{User-Agent}i"'
),
)
await runner.setup()
site = web.TCPSite(runner, port=port)
await site.start()
logger.info(f'Listening {site.name}')
while not shutdown_state.is_shutdown_requested:
await asyncio.sleep(0.1)
if periodic_callback is not None:
periodic_callback()
await runner.cleanup()
except: # noqa
logger.critical('Unhandled exception', exc_info=True)
raise
这是我的端点代码:
@routes.get('/ping')
async def handle_ping(_) -> web.Response:
try:
import time
import asyncio
for i in range(10):
await asyncio.sleep(1)
return web.json_response(
data=PingResult(
service_name=service_name,
version=SERVICE_VERSION,
storage_path=str(storage_dir.path),
daemon_pid=daemon.pid,
daemon_status=str(daemon.status.value),
).dict()
)
except asyncio.CancelledError as ce:
print('Request was cancelled')
return HTTPBadRequest(ErrorResult(error='Request was cancelled'))
客户端代码
async def ping(timeout=10) -> PingResult:
async with aiohttp.ClientSession(timeout=ClientTimeout(total=timeout)) as session:
async with session.get('http://localhost:5002/ping') as resp:
body = await resp.json()
return PingResult.parse_obj(body)
型号
from aiohttp import web
from pydantic import BaseModel
class ErrorResult(TypedDict):
error: str
class HTTPBadRequest(web.HTTPBadRequest):
def __init__(self, error: Mapping) -> None:
super().__init__(text=dumps(error), content_type='application/json')
class PingResult(BaseModel):
service_name: str
version: str
storage_path: str
daemon_pid: int
daemon_status: str
即使我调用
ping(timeout=2)
我也可以看到服务器上的请求没有被取消。或者,如果我调用 curl http://localhost:5002/ping
并在不到 2-3 秒的时间内终止命令,我会得到相同的行为(服务器端代码无需任何终止即可工作)。
我似乎误解了取消请求的整个想法,但我可以弄清楚如何实现我的主要目标。
在 aiohttp 的当前版本中,处理程序取消是一项可选功能(因为它会给来自其他框架的开发人员带来问题)。
可以通过将
handler_cancellation=True
传递给 web.run_app()
或 AppRunner
来启用它。
https://docs.aiohttp.org/en/stable/web_advanced.html#peer-disconnection