我正在编写 python 上下文管理器,它运行异步任务。如果我的经理的任何任务抛出异常,我希望我的经理终止。这是示例代码:
class MyClass:
def __init__(self):
if asyncio.get_event_loop().is_closed():
asyncio.set_event_loop(asyncio.new_event_loop())
self.loop = asyncio.get_event_loop()
def __enter__(self):
return self
def __exit__(self, excType, excValue, tb):
try:
self.loop.run_until_complete(self._exit_loop())
finally:
self.loop.close()
if excType is not None:
print(excType.__name__, ':', excValue)
traceback.print_tb(tb)
async def _exit_loop(self):
tasks = [task for task in asyncio.all_tasks(self.loop) if
task is not asyncio.current_task(self.loop)]
list(map(lambda task: task.cancel(), tasks))
results = await asyncio.gather(*tasks, return_exceptions=True)
self.loop.stop()
async def func1(self):
while True:
print('func1')
await asyncio.sleep(1)
async def func2(self):
i = 5
while i > 0:
print('func2')
await asyncio.sleep(1)
i -= 1
raise Exception
async def _async_start(self):
self.loop.create_task(self.func1())
self.loop.create_task(self.func2())
def start(self):
self.loop.run_until_complete(self._async_start())
with MyClass() as myClass:
myClass.start()
myClass.loop.run_forever()
这是该脚本的输出:
func1
func2
func1
func2
func1
func2
func1
func2
func1
func2
Task exception was never retrieved
func1
future: <Task finished coro=<MyClass.func2() done, defined at /home/framal/Programy/schnapps/schnapps/bottle/client.py:381> exception=Exception()>
Traceback (most recent call last):
File "/home/framal/Programy/schnapps/schnapps/bottle/client.py", line 387, in func2
raise Exception
Exception
func1
func1
func1
.
.
.
我尝试使用自定义异常处理程序,但没有任何效果 - 它们在强制终止进程后立即开始运行。
如何将异常传递给循环,以便它关闭所有其他任务?
更新:在 Python 3.11 中引入了 TaskGroup。它具有所需的功能。
警告:如果您正在寻找标题中问题的解决方案,并且您的代码没有像问题中的代码一样以不寻常的方式启动事件循环,则不要以不寻常的方式停止循环,例如在这个答案中。
我不明白你为什么要这样使用上下文管理器(CM)。
无论如何,如果给定了 CM 并且您将
loop.run_forever()
放入 with
块,那么我知道在这种情况下退出循环以便控制权传递给 CM 的退出函数的唯一方法是 loop.stop()
。
这是一个小装饰器,处理除使用
loop.stop()
取消之外的所有异常。
def watchdog(afunc):
@functools.wraps(afunc)
async def run(*args, **kwargs):
try:
await afunc(*args, **kwargs)
except asyncio.CancelledError:
raise
except Exception as err:
print(f"exception {err}")
# see the warning
# normally you should start a shutdown code here
asyncio.get_event_loop().stop()
return run
如果您装饰由 CM 作为任务启动的所有协程(
func1
和 func2
),例如:
@watchdog
async def func2(self):
然后它将在第一次异常后停止。