我需要在代码中维护一个持久的 httpx 客户端,以便在应用程序的整个生命周期中利用其连接池。下面是我的实现的简化版本:
import asyncio
import weakref
from threading import Lock
import nest_asyncio
from httpx import AsyncClient
class Request:
_instance = None
_lock = Lock()
_loop: asyncio.AbstractEventLoop = None
_new_loop = False
_session = None
def __new__(cls):
if not cls._instance:
with cls._lock:
if not cls._instance:
instance = super().__new__(cls)
cls._instance = instance
weakref.finalize(instance, instance._close)
if cls._loop is None:
try:
cls._loop = asyncio.get_running_loop()
except RuntimeError:
cls._loop = asyncio.new_event_loop()
cls._new_loop = True
asyncio.set_event_loop(cls._loop)
if cls._new_loop:
# nest_asyncio.apply()
cls._loop.run_until_complete(cls.create_client())
else:
cls._loop.create_task(cls.create_client())
return cls._instance
@classmethod
async def create_client(cls):
cls._session = AsyncClient()
@classmethod
def _close(cls):
cls._loop.run_until_complete(cls.close())
if cls._new_loop:
cls._loop.close()
@classmethod
async def close(cls):
if cls._session:
await cls._session.aclose()
async def get(self, url):
try:
return await self._session.get(url)
except Exception:
return None
req = Request()
async def main():
result = await req.get('https://www.google.com')
if result:
print(result.text[:100])
asyncio.run(main())
这会导致
RuntimeError: Event loop is closed
:
...
await self._pool.aclose()
File "/home/user/.cache/pypoetry/virtualenvs/app-gEFTwlce-py3.12/lib/python3.12/site-packages/httpcore/_async/connection_pool.py", line 313, in aclose
await self._close_connections(closing_connections)
File "/home/user/.cache/pypoetry/virtualenvs/app-gEFTwlce-py3.12/lib/python3.12/site-packages/httpcore/_async/connection_pool.py", line 305, in _close_connections
await connection.aclose()
File "/home/user/.cache/pypoetry/virtualenvs/app-gEFTwlce-py3.12/lib/python3.12/site-packages/httpcore/_async/connection.py", line 171, in aclose
await self._connection.aclose()
File "/home/user/.cache/pypoetry/virtualenvs/app-gEFTwlce-py3.12/lib/python3.12/site-packages/httpcore/_async/http11.py", line 265, in aclose
await self._network_stream.aclose()
File "/home/user/.cache/pypoetry/virtualenvs/app-gEFTwlce-py3.12/lib/python3.12/site-packages/httpcore/_backends/anyio.py", line 55, in aclose
await self._stream.aclose()
File "/home/user/.cache/pypoetry/virtualenvs/app-gEFTwlce-py3.12/lib/python3.12/site-packages/anyio/streams/tls.py", line 201, in aclose
await self.transport_stream.aclose()
File "/home/user/.cache/pypoetry/virtualenvs/app-gEFTwlce-py3.12/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 1287, in aclose
self._transport.close()
File "/nix/store/gmx7bwrwy6s0kk89ij5yj8r8ayai95x1-python3-3.12.5/lib/python3.12/asyncio/selector_events.py", line 1210, in close
super().close()
File "/nix/store/gmx7bwrwy6s0kk89ij5yj8r8ayai95x1-python3-3.12.5/lib/python3.12/asyncio/selector_events.py", line 875, in close
self._loop.call_soon(self._call_connection_lost, None)
File "/nix/store/gmx7bwrwy6s0kk89ij5yj8r8ayai95x1-python3-3.12.5/lib/python3.12/asyncio/base_events.py", line 795, in call_soon
self._check_closed()
File "/nix/store/gmx7bwrwy6s0kk89ij5yj8r8ayai95x1-python3-3.12.5/lib/python3.12/asyncio/base_events.py", line 541, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
解决此问题的唯一方法是取消注释包含
nest_asyncio.apply()
的行。但是,由于 nest_asyncio
包很大程度上基于 asyncio 内部函数并且不再维护,因此我对在代码中使用它不感兴趣。
我的问题是: 为什么当我调用
run_until_complete()
(从 _close()
函数内)时循环会关闭,这会创建一个新的事件循环并立即调用 AyncClient.aclose()
函数?我该如何解决这个问题?
供您参考,相同的代码适用于
aiohttp.ClientSession
,无需使用 nest_asyncio
。
出现此问题是因为
httpx.AsyncClient
和 aiohttp.ClientSession
处理事件循环关闭的方式不同。 httpx.AsyncClient
具有额外的后台任务和连接池管理,这些任务和连接池管理取决于所有连接完全关闭时保持打开状态的循环。当在 run_until_complete()
内调用 _close()
时,循环会提前关闭,从而扰乱此过程。
您应该避免使用
run_until_complete()
而是依赖异步上下文管理器。
要解决此问题,请使用异步上下文来管理
AsyncClient
的生命周期。这是代码:
class Request:
_instance = None
_lock = Lock()
_loop: asyncio.AbstractEventLoop = None
_session = None
def __new__(cls):
if not cls._instance:
with cls._lock:
if not cls._instance:
instance = super().__new__(cls)
cls._instance = instance
weakref.finalize(instance, asyncio.run, instance._close())
cls._loop = asyncio.get_event_loop()
cls._loop.create_task(cls.create_client())
return cls._instance
@classmethod
async def create_client(cls):
cls._session = AsyncClient()
@classmethod
async def _close(cls):
if cls._session:
await cls._session.aclose()
async def get(self, url):
try:
return await self._session.get(url)
except Exception:
return None
req = Request()
async def main():
result = await req.get('https://www.google.com')
if result:
print(result.text[:100])
asyncio.run(main())
希望这对你有一点帮助。