在
main()
程序中存在一个故意错误,应该引发异常(在本例中为 ZeroDivisionError
),但事实并非如此。但那里没有任何东西可以抓住它。相反,它取消了一些待处理的任务,并且程序继续运行。如果运行代码,您将看到包含故意错误的情况下,服务器需要两次尝试运行(因为排队的任务被异常取消),但注释掉错误后,它将在第一次尝试时启动。这个错误不应该让我的程序突然停止吗?
async def main():
# logging
loggerTask = asyncio.create_task(initLogging(fileName=ospath.join('log','servers.log')))
await asyncio.sleep(0)
logger.info('servers program running')
tasks = set()
# socket server
socketServer = SocketServer(None)
tasks.add(socketServer.superviseTask)
result = 1/0 # <----- this should cause an exception but it's not
done, pending = await asyncio.wait(tasks)
完整代码:
import os.path as ospath
import asyncio
from socket import gethostbyname, gethostname
import logging
from logging.handlers import QueueHandler, QueueListener
from queue import Queue
logger = logging.getLogger()
async def initLogging(format='%(levelname)8s - %(asctime)s - %(name)s - %(msg)s', consoleLevel=logging.INFO, fileLevel=logging.DEBUG, fileName='log.log'):
# Change root logger level from WARNING (default) to NOTSET in order for all messages to be delegated.
logger = logging.getLogger()
logger.setLevel(logging.NOTSET)
# Add file handler
fileHandler = logging.FileHandler(filename=fileName, mode='w')
fileHandler.setLevel(fileLevel)
fileHandler.setFormatter(logging.Formatter(format))
logger.addHandler(fileHandler)
# Add queue handler
queue = Queue()
queueHandler = QueueHandler(queue)
queueHandler.setLevel(logging.INFO)
queueHandler.setFormatter(logging.Formatter(format))
logger.addHandler(queueHandler)
listener = QueueListener(queue, logging.StreamHandler())
try:
listener.start()
logger.info('logger started')
while True:
await asyncio.sleep(10)
finally:
logger.info('logger finished')
listener.stop
# async server base class
class AsyncServer():
def __init__(self, app, name:str, type:str, maxRetryAttempts:int=-1, retryDelay:int=5, autoStart:bool=True) -> None:
self.app = app
self.name = name
self.type = type
self.retryCount = 0
self.maxRetryAttempts = maxRetryAttempts # default -1 = unlimited
self.retryDelay = retryDelay # default = 5 seconds
if autoStart:
self.start()
def log(self, msg):
return f'{self.type} {self.name} - {msg}'
async def awaitCoro(self):
while True:
self.retryCount += 1
logger.info(self.log(f'run task starting attempt {self.retryCount}/{self.maxRetryAttempts}'))
try:
self.runTask = asyncio.create_task(self.runCoro())
except Exception as err:
logger.error(self.log(f'error creating run task - {err}'))
else:
try:
await self.runTask
# self.retryCount = 0
except asyncio.exceptions.CancelledError as err:
logger.info(self.log('run task cancelled'))
except Exception as err:
logger.error(self.log(f'error - {err}'))
# if self.runTask.done():
# break
if (self.maxRetryAttempts > 0) and (self.retryCount >= self.maxRetryAttempts):
logger.warning(self.log('max retry attempts reached'))
break
logger.info(self.log(f'attempting restart in {self.retryDelay} seconds'))
await asyncio.sleep(self.retryDelay)
logger.info(self.log('supervise task cancelled'))
def start(self):
logger.info(self.log('starting'))
self.superviseTask = asyncio.create_task(self.awaitCoro())
# self.app.taskRegistry.addItem(self.name, self.superviseTask)
def shutdown(self):
logger.info(self.log('shutting down'))
self.superviseTask.cancel()
self.runTask.cancel()
def restart(self):
logger.info(self.log('restarting'))
self.shutdown()
self.start()
# socket server and client
class SocketBase():
def __init__(self, hostname:str='', ipaddr:str='', port:int=32843) -> None:
if hostname:
self.hostname = hostname
self.ipaddr = gethostbyname(hostname)
elif ipaddr:
self.hostname = ipaddr
self.ipaddr = ipaddr
else:
self.hostname = gethostname()
self.ipaddr = gethostbyname(self.hostname)
self.port = port
class SocketServer(SocketBase, AsyncServer):
def __init__(self, app, hostname: str = '', ipaddr: str = '', port: int = 32843, maxRetryAttempts:int=-1, retryDelay:int=5) -> None:
SocketBase.__init__(self, hostname, ipaddr, port)
AsyncServer.__init__(self, app, self.hostname, 'socket server', maxRetryAttempts=maxRetryAttempts, retryDelay=retryDelay)
self.app = app
async def runCoro(self) -> None:
try:
self.server = await asyncio.start_server(self.handler, self.ipaddr, self.port)
logger.info(self.log(f'listening on {self.ipaddr}:{self.port}'))
async with self.server:
await self.server.serve_forever()
except Exception as err:
logger.error(self.log(f'error starting server - {err}'))
async def handler(self, reader:asyncio.StreamReader, writer:asyncio.StreamWriter):
logger.info(self.log('receiving message'))
async def main():
# logging
loggerTask = asyncio.create_task(initLogging(fileName=ospath.join('log','servers.log')))
await asyncio.sleep(0)
logger.info('servers program running')
tasks = set()
# socket server
socketServer = SocketServer(None)
tasks.add(socketServer.superviseTask)
result = 1/0
done, pending = await asyncio.wait(tasks)
if __name__ == '__main__':
asyncio.run(main())
Python 3.12.3
当然,
1/0
会立即终止main
。几乎您看到的所有内容都发生在异步清理期间。此时存在的所有任务都将被取消并等待,但您的代码不会对 CancelledError 做出应有的反应。请看这里:
try:
await self.runTask
# self.retryCount = 0
except asyncio.exceptions.CancelledError as err:
logger.info(self.log('run task cancelled'))
# --- added comment: missing raise here --
except Exception as err:
这个片段“吞噬”了
asyncio.CancelledError
,并像什么都没发生一样继续下去。这违反了“合同”并停止了清理工作。
asyncio 时可以捕获此异常来执行自定义操作 任务被取消。在几乎所有情况下,例外情况都必须是 重新提出。
在评论标记的地方添加
raise
,再试一次。