更新
解决此问题的方法是在获取行asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
之前添加loop = asyncio.get_event_loop()
:我正在查看代码,还看到另一行对我有帮助。 试试这个代替 我希望这将asyncio.set_event_loop(asyncio.ProactorEventLoop()) 帮助您– Gerrit Geeraerts
我在正确使用我的代码时遇到了一些困难,因为在VSCode上调试时我的代码完成执行后出现以下错误:
Exception ignored in: <function _ProactorBasePipeTransport.__del__ at 0x00000188AB3259D0> Traceback (most recent call last): File "c:\users\gam3p\appdata\local\programs\python\python38\lib\asyncio\proactor_events.py", line 116, in __del__ self.close() File "c:\users\gam3p\appdata\local\programs\python\python38\lib\asyncio\proactor_events.py", line 108, in close self._loop.call_soon(self._call_connection_lost, None) File "c:\users\gam3p\appdata\local\programs\python\python38\lib\asyncio\base_events.py", line 719, in call_soon self._check_closed() File "c:\users\gam3p\appdata\local\programs\python\python38\lib\asyncio\base_events.py", line 508, in _check_closed raise RuntimeError('Event loop is closed') RuntimeError: Event loop is closed
如果我使用命令行运行代码,也会收到以下错误:
Cancelling an overlapped future failed future: <_OverlappedFuture pending overlapped=<pending, 0x25822633550> cb=[_ProactorReadPipeTransport._loop_reading()]> Traceback (most recent call last): File "c:\users\gam3p\appdata\local\programs\python\python38\lib\asyncio\windows_events.py", line 66, in _cancel_overlapped self._ov.cancel() OSError: [WinError 6] The handle is invalid
以下代码是Reddit的异步API包装器。由于Reddit要求漫游器限制为每分钟60个请求,因此我决定在单独的线程中实现一个处理循环的队列,以处理请求。
要运行它,您需要create a Reddit app并使用您的登录凭据以及机器人ID和密码。
如果有帮助,我正在Windows 10上使用Python 3.8.3 64位。
import requests
import aiohttp
import asyncio
import threading
from types import SimpleNamespace
from time import time
oauth_url = 'https://oauth.reddit.com/'
base_url = 'https://www.reddit.com/'
agent = 'windows:reddit-async:v0.1 (by /u/UrHyper)'
class Reddit:
def __init__(self, username: str, password: str, app_id: str, app_secret: str):
data = {'grant_type': 'password',
'username': username, 'password': password}
auth = requests.auth.HTTPBasicAuth(app_id, app_secret)
response = requests.post(base_url + 'api/v1/access_token',
data=data,
headers={'user-agent': agent},
auth=auth)
self.auth = response.json()
if 'error' in self.auth:
msg = f'Failed to authenticate: {self.auth["error"]}'
if 'message' in self.auth:
msg += ' - ' + self.auth['message']
raise ValueError(msg)
token = 'bearer ' + self.auth['access_token']
self.headers = {'Authorization': token, 'User-Agent': agent}
self._rate = 1
self._last_loop_time = 0.0
self._loop = asyncio.new_event_loop()
self._queue = asyncio.Queue(0)
self._end_loop = False
self._loop_thread = threading.Thread(target=self._start_loop_thread)
self._loop_thread.start()
def stop(self):
self._end_loop = True
def __del__(self):
self.stop()
def _start_loop_thread(self):
asyncio.set_event_loop(self._loop)
self._loop.run_until_complete(self._process_queue())
async def _process_queue(self):
while True:
if self._end_loop and self._queue.empty():
await self._queue.join()
break
start_time = time()
if self._last_loop_time < self._rate:
await asyncio.sleep(self._rate - self._last_loop_time)
try:
queue_item = self._queue.get_nowait()
url = queue_item['url']
callback = queue_item['callback']
data = await self._get_data(url)
self._queue.task_done()
callback(data)
except asyncio.QueueEmpty:
pass
finally:
self._last_loop_time = time() - start_time
async def _get_data(self, url):
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=self.headers) as response:
assert response.status == 200
data = await response.json()
data = SimpleNamespace(**data)
return data
async def get_bot(self, callback: callable):
url = oauth_url + 'api/v1/me'
await self._queue.put({'url': url, 'callback': callback})
async def get_user(self, user: str, callback: callable):
url = oauth_url + 'user/' + user + '/about'
await self._queue.put({'url': url, 'callback': callback})
def callback(data): print(data['name'])
async def main():
reddit = Reddit('', '', '', '')
await reddit.get_bot(lambda bot: print(bot.name))
reddit.stop()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
更新此问题的解决方案是在获取线路循环= asyncio.get_event_loop()之前添加asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy():我正在寻找...