在 .py 模块中我创建了这个函数
from utils.telegram_utils import telegram_message
...
counter = count(1)
def open_websites_with_progress(url):
risultato = open_websites(url)
progress = next(counter)
if progress in milestones:
percentage = (progress / total_urls) * 100
message = str(f"{percentage:.0f}% completato ({progress}/{total_urls})")
logger.info(message)
if __name__ == '__main__':
print("PROVA")
asyncio.run(telegram_message(message))
if risultato == 'TimeoutError':
logger.warning(f"TimeoutError: xxxxx{url}")
return
elif risultato == 'GoToError':
logger.error(f"GoToError: xxxxx{url}")
return
elif risultato == 'MatchNotFound':
logger.warning(f"MatchNotFound: xxxxx{url}")
return
return risultato
with ThreadPoolExecutor(max_workers=3) as executor_2:
results = executor_2.map(open_websites_with_progress, combined_total_urls)
遇到的问题是在从另一个模块 utils/telegram_utils.py 调用的函数上
async def telegram_message(message):
await bot.send_message(chat_id=GROUP_CHAT_ID, text=message)
通常如果在 asyncio 中调用就可以工作,但在这种情况下,插入 ThreadPoolExecutor 会给我带来问题:第一次工作时,它在机器人上执行第一个“打印”,但第二次它停止工作并报告以下错误:
PROVA
PROVA
PROVA
Traceback (most recent call last):
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\telegram\request\_baserequest.py", line 333, in _request_wrapper
code, payload = await self.do_request(
^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\telegram\request\_httpxrequest.py", line 292, in do_request
res = await self._client.request(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpx\_client.py", line 1585, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpx\_client.py", line 1674, in send
response = await self._send_handling_auth(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpx\_client.py", line 1702, in _send_handling_auth
response = await self._send_handling_redirects(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpx\_client.py", line 1739, in _send_handling_redirects
response = await self._send_single_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpx\_client.py", line 1776, in _send_single_request
response = await transport.handle_async_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpx\_transports\default.py", line 377, in handle_async_request
resp = await self._pool.handle_async_request(req)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpcore\_async\connection_pool.py", line 216, in handle_async_request
raise exc from None
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpcore\_async\connection_pool.py", line 189, in handle_async_request
await self._close_connections(closing)
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpcore\_async\connection_pool.py", line 305, in _close_connections
await connection.aclose()
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpcore\_async\connection.py", line 171, in aclose
await self._connection.aclose()
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpcore\_async\http11.py", line 265, in aclose
await self._network_stream.aclose()
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpcore\_backends\anyio.py", line 55, in aclose
await self._stream.aclose()
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\anyio\streams\tls.py", line 202, in aclose
await self.transport_stream.aclose()
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\anyio\_backends\_asyncio.py", line 1258, in aclose
self._transport.close()
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\asyncio\proactor_events.py", line 109, in close
self._loop.call_soon(self._call_connection_lost, None)
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\asyncio\base_events.py", line 795, in call_soon
self._check_closed()
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\asyncio\base_events.py", line 541, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:\Users\lucag\Desktop\Exchange-main\Exchange-main\betfair.py", line 190, in <module>
betfair_scraper()
File "C:\Users\lucag\Desktop\Exchange-main\Exchange-main\betfair.py", line 176, in betfair_scraper
for result in results:
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\concurrent\futures\_base.py", line 619, in result_iterator
yield _result_or_cancel(fs.pop())
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\concurrent\futures\_base.py", line 317, in _result_or_cancel
return fut.result(timeout)
^^^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\concurrent\futures\_base.py", line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\concurrent\futures\_base.py", line 401, in __get_result
raise self._exception
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\concurrent\futures\thread.py", line 58, in run
result = self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\Desktop\Exchange-main\Exchange-main\betfair.py", line 157, in open_websites_with_progress
asyncio.run(telegram_message(message))
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\asyncio\runners.py", line 194, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\asyncio\runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\asyncio\base_events.py", line 687, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "C:\Users\lucag\Desktop\Exchange-main\Exchange-main\utils\telegram_utils.py", line 10, in telegram_message
await bot.send_message(chat_id=GROUP_CHAT_ID, text=message)
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\telegram\_bot.py", line 1029, in send_message
return await self._send_message(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\telegram\_bot.py", line 745, in _send_message
result = await self._post(
^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\telegram\_bot.py", line 623, in _post
return await self._do_post(
^^^^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\telegram\_bot.py", line 652, in _do_post
result = await request.post(
^^^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\telegram\request\_baserequest.py", line 201, in post
result = await self._request_wrapper(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\telegram\request\_baserequest.py", line 345, in _request_wrapper
raise NetworkError(f"Unknown error in HTTP implementation: {exc!r}") from exc
telegram.error.NetworkError: Unknown error in HTTP implementation: RuntimeError('Event loop is closed')
我尝试设置一个初始循环然后关闭它,但我做不到或者我不知道该怎么做
很难确定为什么您会收到 RuntimeError: Event Loop is close ,因为您发布的内容远远小于最小的、可重现的示例。但我观察到,每次执行
asyncio.run(telegram_message(message))
时,您都在创建一个新的事件循环。这是低效的。这也可能是您错误的原因吗?我不知道,但也许事件循环正在被传递给其他线程,然后在asyncio.run
完成并且事件循环关闭后尝试在其上运行异步函数。
因此请尝试以下操作,但不能保证这会解决您的问题:为池中的每个线程创建自己的事件循环,该事件循环在池初始化时打开一次,并在池线程终止时关闭。指定一个池初始值设定项函数,该函数将为每个线程创建一个新的事件循环并将其保存在线程本地存储中。事件循环包装在一个简单的类实例中,当线程终止时垃圾收集时,该实例将首先关闭循环:
from threading import local
class Loop():
def __init__(self):
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
def __call__(self):
return self._loop
def __del__(self):
self._loop.close()
my_local = local()
def init_pool():
# Create an event loop for each thread in the thread pool
my_local.loop = Loop()
from utils.telegram_utils import telegram_message
...
counter = count(1)
def open_websites_with_progress(url):
...
if progress in milestones:
percentage = (progress / total_urls) * 100
message = str(f"{percentage:.0f}% completato ({progress}/{total_urls})")
#logger.info(message) #rda
if __name__ == '__main__':
print("PROVA")
loop = my_local.loop()
loop.run_until_complete(telegram_message(message))
...
with ThreadPoolExecutor(max_workers=3, initializer=init_pool) as executor_2:
# Iterate the iterator to ensure all submitted tasks have completed
results = list(executor_2.map(open_websites_with_progress, combined_total_urls))