我正在编写一个 python 脚本,每隔几分钟检查几个域的正常运行时间。我在 while 循环中为每个网站运行一个协程,并在检查完成后让它们休眠。这可行,但我也希望能够取消它们。我遇到的问题是,当我在 asyncio.gather() 中等待这些协程时,它们会阻塞线程,因为它们永远不会返回结果。
如果我删除
await asyncio.gather(*tasks.values(), return_exceptions=True)
我会得到RuntimeError: Session is closed
如何在不阻塞线程的情况下运行它们?
这是代码的简化版本。我正在使用一个简单的 aiohttp 服务器进行测试。
服务器代码:
from aiohttp import web
import asyncio
import random
async def handle(request: web.Request) -> web.Response:
await asyncio.sleep(random.randint(0, 3))
return web.Response(text=f"Hello, from {request.rel_url.path}")
app = web.Application()
app.router.add_route('GET', '/{name}', handle)
web.run_app(app)
正常运行时间检查代码:
import asyncio
import aiohttp
LIMIT = 2
async def check_uptime_coro(session: aiohttp.ClientSession, url: str, semaphore: asyncio.BoundedSemaphore) -> None:
while True:
try:
async with semaphore:
async with session.get(url) as response:
if response.status != 200:
print(f"error with {url} {response.status}")
else:
print(f"success with {url}")
await asyncio.sleep(5)
except Exception as e:
print(f"error with {url} {e}")
async def main() -> None:
urls = [f"http://localhost:8080/{x}" for x in range(0, 10)]
tasks = {}
semaphore = asyncio.BoundedSemaphore(LIMIT)
try:
async with aiohttp.ClientSession() as session:
for url in urls:
tasks[url] = asyncio.create_task(
check_uptime_coro(session, url, semaphore))
await asyncio.gather(*tasks.values(), return_exceptions=True)
print("This doesn't print!")
except Exception as e:
print(f"error! {e}")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print("This also doesn't print!")
您必须了解,您的调用 do
asyncio.gather
并不是“阻塞线程”,而是阻塞 task。
如果您想在检查器任务运行时运行任务,并且 Web 会话打开,只需触发 gather
即可在与您要运行任务的任务不同的任务中同步这些任务。
如果任务永远不会返回,那么您实际上甚至不需要调用聚集 - 只需记下集合或其他容器中的所有任务,并将其保留下来,这样它们就不会被取消引用,并且可以正确地进行处理到了时间就取消了。
除此之外,您在不等待收集任务时报告错误的唯一原因是您的执行脱离了打开会话的
with
语句块。
您可以不使用 with
块来开始,然后手动调用
__enter__
和 __exit__
方法 - 但您也可以简单地重写一些东西,以便 with 块处于一个单独的任务中,就像我一样上文提到的。在 Python 3.11 中,您可以使用任务组:它们会比 gather
更好地工作,并在父任务本身被取消时取消所有检查器任务。import asyncio
import aiohttp
LIMIT = 2
async def check_uptime_coro(session: aiohttp.ClientSession, url: str, semaphore: asyncio.BoundedSemaphore) -> None:
while True:
try:
async with semaphore:
async with session.get(url) as response:
if response.status != 200:
print(f"error with {url} {response.status}")
else:
print(f"success with {url}")
await asyncio.sleep(5)
except Exception as e:
print(f"error with {url} {e}")
async def check_uptime_master():
urls = [f"http://localhost:8080/{x}" for x in range(0, 10)]
tasks = {}
semaphore = asyncio.BoundedSemaphore(LIMIT)
async with aiohttp.ClientSession() as session:
for url in urls:
tasks[url] = asyncio.create_task(
check_uptime_coro(session, url, semaphore))
await asyncio.gather(*tasks.values(), return_exceptions=True)
async def main() -> None:
try:
checker_task = asyncio.create_task(check_uptime_master())
await asyncio.sleep(0) # give the asyncio loop a chance to fire-up the subtasks
print("This now, does print!")
except Exception as e:
print(f"error! {e}")
# go on with your code on the main task, DOn't forget to yield to the loop
# so subtasks can run!
...
if __name__ == "__main__":
asyncio.run(main()) # this is the new recomended way to fire asyncio
# loop = asyncio.get_event_loop() #<- obsolete
# loop.run_until_complete(main()) # <- obsolete
print("This also doesn't print - and will not until yo write code that explictly cancels the `checker_task` above")