asyncio.create_connection在每次调用时实例化ThreadPoolExecuter

问题描述 投票:0回答:1

下面的tcp客户端代码基本上循环,直到它能够与tcp服务器建立连接。在vs代码中调试时,我注意到每次调用create_connection()似乎都会创建一个新的ThreadPoolExecutor实例。当我的tcp服务器没有运行时,这可能会出现问题,因为好像会创建无限数量的实例。当服务器未运行时,多次处理调用create_connection()的正确方法是什么?

enter image description here

async def do_tcp_connect(host, port, queue, shutdown_event, callbacks, connection_list):

    logger = logging.getLogger('do_tcp_connect')
    logger.debug('do_tcp_connect(): started')

    while True:
        try:
            transport, protocol = await asyncio.get_running_loop().create_connection(lambda: asyncio_callback(host, port, queue, shutdown_event, callbacks, connection_list), host, port)
            return
        except (KeyboardInterrupt, SystemExit):
            raise
        except asyncio.CancelledError as ex:
            logger.error("network_queue_consumer(): CancelledError: A asyncio coroutine task was cancelled, error={}".format(ex))
        except AttributeError as ex:
            logger.error("network_queue_consumer(): AttributeError: An attribute reference or assignment has failed, error:{}".format(ex))
        except OSError as ex:
            logger.error("network_queue_consumer(): OSError error:{}".format(ex))
        except asyncio.TimeoutError:
            continue
        except Exception as ex:
            logger.error('network_queue_consumer(): Exception occurred, error={}'.format(ex))
        except:
            logger.error("network_queue_consumer(): Unexpected error: {}".format(sys.exc_info()[0]))
            raise

        await asyncio.sleep(5.0)

    logger.debug('do_tcp_connect(): completed')
python-asyncio python-3.7
1个回答
0
投票

我查看了asyncio源代码,发现ThreadPoolExecutor正在下面的方法中实例化。调用loop.set_default_executor(executor)似乎解决了我遇到的问题。

asyncio:base_events.py:742
    def run_in_executor(self, executor, func, *args):
        self._check_closed()
        if self._debug:
            self._check_callback(func, 'run_in_executor')
        if executor is None:
            executor = self._default_executor
            if executor is None:
                executor = concurrent.futures.ThreadPoolExecutor()
                self._default_executor = executor
        return futures.wrap_future(
            executor.submit(func, *args), loop=self)
© www.soinside.com 2019 - 2024. All rights reserved.