异常正在取消异步程序中的任务 - 为什么?

问题描述 投票:0回答:1

main()
程序中存在一个故意错误,应该引发异常(在本例中为
ZeroDivisionError
),但事实并非如此。但那里没有任何东西可以抓住它。相反,它取消了一些待处理的任务,并且程序继续运行。如果运行代码,您将看到包含故意错误的情况下,服务器需要两次尝试运行(因为排队的任务被异常取消),但注释掉错误后,它将在第一次尝试时启动。这个错误不应该让我的程序突然停止吗?

async def main():
    # logging
    loggerTask = asyncio.create_task(initLogging(fileName=ospath.join('log','servers.log')))
    await asyncio.sleep(0)
    logger.info('servers program running')

    tasks = set()

    # socket server
    socketServer = SocketServer(None)
    tasks.add(socketServer.superviseTask)

    result = 1/0    # <----- this should cause an exception but it's not

    done, pending = await asyncio.wait(tasks)

完整代码:

import os.path as ospath
import asyncio
from socket import gethostbyname, gethostname
import logging
from logging.handlers import QueueHandler, QueueListener
from queue import Queue

logger = logging.getLogger()

async def initLogging(format='%(levelname)8s - %(asctime)s - %(name)s - %(msg)s', consoleLevel=logging.INFO, fileLevel=logging.DEBUG, fileName='log.log'):
    # Change root logger level from WARNING (default) to NOTSET in order for all messages to be delegated.
    logger = logging.getLogger()
    logger.setLevel(logging.NOTSET)

    # Add file handler
    fileHandler = logging.FileHandler(filename=fileName, mode='w')
    fileHandler.setLevel(fileLevel)
    fileHandler.setFormatter(logging.Formatter(format))
    logger.addHandler(fileHandler)

    # Add queue handler
    queue = Queue()
    queueHandler = QueueHandler(queue)
    queueHandler.setLevel(logging.INFO)
    queueHandler.setFormatter(logging.Formatter(format))
    logger.addHandler(queueHandler)
    listener = QueueListener(queue, logging.StreamHandler())
    try:
        listener.start()
        logger.info('logger started')
        while True:
            await asyncio.sleep(10)
    finally:
        logger.info('logger finished')
        listener.stop

# async server base class

class AsyncServer():
    def __init__(self, app, name:str, type:str, maxRetryAttempts:int=-1, retryDelay:int=5, autoStart:bool=True) -> None:
        self.app = app
        self.name = name
        self.type = type
        self.retryCount = 0
        self.maxRetryAttempts = maxRetryAttempts # default -1 = unlimited
        self.retryDelay = retryDelay # default = 5 seconds
        if autoStart:
            self.start()

    def log(self, msg):
        return f'{self.type} {self.name} - {msg}'

    async def awaitCoro(self):
        while True:
            self.retryCount += 1
            logger.info(self.log(f'run task starting attempt {self.retryCount}/{self.maxRetryAttempts}'))
            try:
                self.runTask = asyncio.create_task(self.runCoro())
            except Exception as err:
                logger.error(self.log(f'error creating run task - {err}'))
            else:
                try:
                    await self.runTask
                    # self.retryCount = 0
                except asyncio.exceptions.CancelledError as err:
                    logger.info(self.log('run task cancelled'))
                except Exception as err:
                    logger.error(self.log(f'error - {err}'))
            # if self.runTask.done():
            #   break
            if (self.maxRetryAttempts > 0) and (self.retryCount >= self.maxRetryAttempts):
                logger.warning(self.log('max retry attempts reached'))
                break
            logger.info(self.log(f'attempting restart in {self.retryDelay} seconds'))
            await asyncio.sleep(self.retryDelay)
        logger.info(self.log('supervise task cancelled'))

    def start(self):
        logger.info(self.log('starting'))
        self.superviseTask = asyncio.create_task(self.awaitCoro())
        # self.app.taskRegistry.addItem(self.name, self.superviseTask)

    def shutdown(self):
        logger.info(self.log('shutting down'))
        self.superviseTask.cancel()
        self.runTask.cancel()
    
    def restart(self):
        logger.info(self.log('restarting'))
        self.shutdown()
        self.start()

# socket server and client

class SocketBase():
    def __init__(self, hostname:str='', ipaddr:str='', port:int=32843) -> None:
        if hostname:
            self.hostname = hostname
            self.ipaddr = gethostbyname(hostname)
        elif ipaddr:
            self.hostname = ipaddr
            self.ipaddr = ipaddr
        else:
            self.hostname = gethostname()
            self.ipaddr = gethostbyname(self.hostname)
        self.port = port

class SocketServer(SocketBase, AsyncServer):
    def __init__(self, app, hostname: str = '', ipaddr: str = '', port: int = 32843, maxRetryAttempts:int=-1, retryDelay:int=5) -> None:
        SocketBase.__init__(self, hostname, ipaddr, port)
        AsyncServer.__init__(self, app, self.hostname, 'socket server', maxRetryAttempts=maxRetryAttempts, retryDelay=retryDelay)
        self.app = app
        
    async def runCoro(self) -> None:
        try:
            self.server = await asyncio.start_server(self.handler, self.ipaddr, self.port)
            logger.info(self.log(f'listening on {self.ipaddr}:{self.port}'))
            async with self.server:
                await self.server.serve_forever()
        except Exception as err:
            logger.error(self.log(f'error starting server - {err}'))

    async def handler(self, reader:asyncio.StreamReader, writer:asyncio.StreamWriter):
        logger.info(self.log('receiving message'))

async def main():
    # logging
    loggerTask = asyncio.create_task(initLogging(fileName=ospath.join('log','servers.log')))
    await asyncio.sleep(0)
    logger.info('servers program running')

    tasks = set()
    
    # socket server
    socketServer = SocketServer(None)
    tasks.add(socketServer.superviseTask)

    result = 1/0

    done, pending = await asyncio.wait(tasks)

if __name__ == '__main__':
    asyncio.run(main())

Python 3.12.3

python-3.x exception python-asyncio
1个回答
0
投票

当然,

1/0
会立即终止
main
。几乎您看到的所有内容都发生在异步清理期间。此时存在的所有任务都将被取消并等待,但您的代码不会对 CancelledError 做出应有的反应。请看这里:

            try:
                await self.runTask
                # self.retryCount = 0
            except asyncio.exceptions.CancelledError as err:
                logger.info(self.log('run task cancelled'))
                # --- added comment: missing raise here --
            except Exception as err:

这个片段“吞噬”了

asyncio.CancelledError
,并像什么都没发生一样继续下去。这违反了“合同”并停止了清理工作。

asyncio 时可以捕获此异常来执行自定义操作 任务被取消。在几乎所有情况下,例外情况都必须是 重新提出。

在评论标记的地方添加

raise
,再试一次。

© www.soinside.com 2019 - 2024. All rights reserved.