错误:ThreadPoolExecutor 中的事件循环已关闭

问题描述 投票:0回答:1

在 .py 模块中我创建了这个函数


    from utils.telegram_utils import telegram_message

    ...

    counter = count(1)

    def open_websites_with_progress(url):
        risultato = open_websites(url)
        progress = next(counter)

        if progress in milestones:
            percentage = (progress / total_urls) * 100

            message = str(f"{percentage:.0f}% completato ({progress}/{total_urls})")

            logger.info(message)
            if __name__ == '__main__':
                print("PROVA")
                asyncio.run(telegram_message(message))

        if risultato == 'TimeoutError':
            logger.warning(f"TimeoutError: xxxxx{url}")
            return

        elif risultato == 'GoToError':
            logger.error(f"GoToError: xxxxx{url}")
            return

        elif risultato == 'MatchNotFound':
            logger.warning(f"MatchNotFound: xxxxx{url}")
            return

        return risultato

    with ThreadPoolExecutor(max_workers=3) as executor_2:
        results = executor_2.map(open_websites_with_progress, combined_total_urls)

遇到的问题是在从另一个模块 utils/telegram_utils.py 调用的函数上

async def telegram_message(message):
    await bot.send_message(chat_id=GROUP_CHAT_ID, text=message)

通常如果在 asyncio 中调用就可以工作,但在这种情况下,插入 ThreadPoolExecutor 会给我带来问题:第一次工作时,它在机器人上执行第一个“打印”,但第二次它停止工作并报告以下错误:

PROVA
PROVA
PROVA
Traceback (most recent call last):
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\telegram\request\_baserequest.py", line 333, in _request_wrapper
    code, payload = await self.do_request(
                    ^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\telegram\request\_httpxrequest.py", line 292, in do_request
    res = await self._client.request(
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpx\_client.py", line 1585, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpx\_client.py", line 1674, in send
    response = await self._send_handling_auth(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpx\_client.py", line 1702, in _send_handling_auth
    response = await self._send_handling_redirects(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpx\_client.py", line 1739, in _send_handling_redirects
    response = await self._send_single_request(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpx\_client.py", line 1776, in _send_single_request
    response = await transport.handle_async_request(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpx\_transports\default.py", line 377, in handle_async_request
    resp = await self._pool.handle_async_request(req)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpcore\_async\connection_pool.py", line 216, in handle_async_request
    raise exc from None
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpcore\_async\connection_pool.py", line 189, in handle_async_request
    await self._close_connections(closing)
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpcore\_async\connection_pool.py", line 305, in _close_connections
    await connection.aclose()
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpcore\_async\connection.py", line 171, in aclose
    await self._connection.aclose()
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpcore\_async\http11.py", line 265, in aclose
    await self._network_stream.aclose()
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpcore\_backends\anyio.py", line 55, in aclose
    await self._stream.aclose()
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\anyio\streams\tls.py", line 202, in aclose
    await self.transport_stream.aclose()
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\anyio\_backends\_asyncio.py", line 1258, in aclose
    self._transport.close()
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\asyncio\proactor_events.py", line 109, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\asyncio\base_events.py", line 795, in call_soon
    self._check_closed()
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\asyncio\base_events.py", line 541, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Users\lucag\Desktop\Exchange-main\Exchange-main\betfair.py", line 190, in <module>
    betfair_scraper()
  File "C:\Users\lucag\Desktop\Exchange-main\Exchange-main\betfair.py", line 176, in betfair_scraper
    for result in results:
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\concurrent\futures\_base.py", line 619, in result_iterator
    yield _result_or_cancel(fs.pop())
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\concurrent\futures\_base.py", line 317, in _result_or_cancel
    return fut.result(timeout)
           ^^^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\concurrent\futures\_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\concurrent\futures\_base.py", line 401, in __get_result
    raise self._exception
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\concurrent\futures\thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\Desktop\Exchange-main\Exchange-main\betfair.py", line 157, in open_websites_with_progress
    asyncio.run(telegram_message(message))
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\asyncio\runners.py", line 194, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\asyncio\runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\asyncio\base_events.py", line 687, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "C:\Users\lucag\Desktop\Exchange-main\Exchange-main\utils\telegram_utils.py", line 10, in telegram_message
    await bot.send_message(chat_id=GROUP_CHAT_ID, text=message)
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\telegram\_bot.py", line 1029, in send_message
    return await self._send_message(
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\telegram\_bot.py", line 745, in _send_message
    result = await self._post(
             ^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\telegram\_bot.py", line 623, in _post
    return await self._do_post(
           ^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\telegram\_bot.py", line 652, in _do_post
    result = await request.post(
             ^^^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\telegram\request\_baserequest.py", line 201, in post
    result = await self._request_wrapper(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\telegram\request\_baserequest.py", line 345, in _request_wrapper
    raise NetworkError(f"Unknown error in HTTP implementation: {exc!r}") from exc
telegram.error.NetworkError: Unknown error in HTTP implementation: RuntimeError('Event loop is closed')

我尝试设置一个初始循环然后关闭它,但我做不到或者我不知道该怎么做

python python-asyncio python-telegram-bot event-loop threadpoolexecutor
1个回答
0
投票

很难确定为什么您会收到 RuntimeError: Event Loop is close ,因为您发布的内容远远小于最小的、可重现的示例。但我观察到,每次执行

asyncio.run(telegram_message(message))
时,您都在创建一个新的事件循环。这是低效的。这也可能是您错误的原因吗?我不知道,但也许事件循环正在被传递给其他线程,然后在
asyncio.run
完成并且事件循环关闭后尝试在其上运行异步函数。

因此请尝试以下操作,但不能保证这会解决您的问题:为池中的每个线程创建自己的事件循环,该事件循环在池初始化时打开一次,并在池线程终止时关闭。指定一个池初始值设定项函数,该函数将为每个线程创建一个新的事件循环并将其保存在线程本地存储中。事件循环包装在一个简单的类实例中,当线程终止时垃圾收集时,该实例将首先关闭循环:

from threading import local

class Loop():
    def __init__(self):
        self._loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self._loop)

    def __call__(self):
        return self._loop

    def __del__(self):
        self._loop.close()

my_local = local()

def init_pool():
    # Create an event loop for each thread in the thread pool
    my_local.loop = Loop()

from utils.telegram_utils import telegram_message

    ...
    
    counter = count(1)
    
    def open_websites_with_progress(url):
        ...
    
        if progress in milestones:
            percentage = (progress / total_urls) * 100
    
            message = str(f"{percentage:.0f}% completato ({progress}/{total_urls})")
    
            #logger.info(message)  #rda
            if __name__ == '__main__':
                print("PROVA")
                loop = my_local.loop()
                loop.run_until_complete(telegram_message(message))
                
        ...
    
    with ThreadPoolExecutor(max_workers=3, initializer=init_pool) as executor_2:
        # Iterate the iterator to ensure all submitted tasks have completed
        results = list(executor_2.map(open_websites_with_progress, combined_total_urls))
© www.soinside.com 2019 - 2024. All rights reserved.