我创建了一个程序,其中包含两个使用 asyncio 运行的主要异步函数:
如果我使用一份 Entra ID 组列表和一份 Azure 资源用户列表运行我的程序一次,一切都会正常工作,我会从 Graph API 获得我期望的结果。但是,如果我随后将代码放入一个循环中,该循环对多个组列表和多个 Azure 资源用户列表执行相同的操作,则仅在循环的第二次迭代中才会失败,并出现错误
Event loop is closed
。无论我对 asyncio 使用哪种方法,我都会在循环的第二次迭代中遇到此错误。
我想知道如何使用 asyncio 执行单独的 Graph API 异步调用并使第二个 API 调用循环正常工作。我需要能够循环遍历资源/组列表并获取所有资源/组的数据,而不仅仅是一次运行一个组/列表。
我尝试过的(这些都不适合我):
async def get_all_group_members():
memberListAll = []
for group in groupList:
results = await gq.get_group_user(group['GroupID'], graphServiceClient)
memberDict = {"GroupName":group["GroupName"], "GroupID":group["GroupID"], "MemberList":results}
memberListAll.append(memberDict)
return memberListAll
# This function will get the display name for an Entra ID user based on the object ID for that account
async def get_user_name():
ownerUsernameList = []
for owner in ownersList:
if owner['principalType'] == 'User':
results = await gq.get_user_by_id(userID=owner['principalID'],graphServiceClient=graphServiceClient)
ownerUsernameList.append(results)
return ownerUsernameList
async def run_async_functions():
groupResults, usernameResults = await asyncio.gather(get_all_group_members(), get_user_name())
return groupResults, usernameResults
groupResults, usernameResults = asyncio.run(run_async_functions())
asyncio.set_event_loop(asyncio.new_event_loop())
newLoop = asyncio.get_event_loop()
isOpen = asyncio.get_event_loop().is_closed()
print("Is the loop open? ", not isOpen)
>>>> Is the loop open? True
async def get_all_group_members():
memberListAll = []
for group in groupList:
results = await gq.get_group_user(group['GroupID'], graphServiceClient) # Calls function from other script I wrote
memberDict = {"GroupName":group["GroupName"], "GroupID":group["GroupID"], "MemberList":results}
memberListAll.append(memberDict)
return memberListAll
async def get_user_name():
ownerUsernameList = []
for owner in ownersList:
if owner['principalType'] == 'User':
results = await gq.get_user_by_id(userID=owner['principalID'],graphServiceClient=graphServiceClient)
ownerUsernameList.append(results)
return ownerUsernameList
# This function calls both of the above async functions so that we can have one function for asyncio to run
async def run_async_functions():
groupResults = await get_all_group_members()
usernameResults = await get_user_name()
return groupResults, usernameResults
groupResults, usernameResults = newLoop.run_until_complete(run_async_functions())
newLoop.close()
isClosed = asyncio.get_event_loop().is_closed()
print("Is the loop closed? ", str(isClosed))
>>>> Is the loop closed? True
async def get_all_group_members():
memberListAll = []
for group in groupList:
results = await gq.get_group_user(group['GroupID'], graphServiceClient)
memberDict = {"GroupName":group["GroupName"], "GroupID":group["GroupID"], "MemberList":results}
memberListAll.append(memberDict)
return memberListAll
async def get_user_name():
ownerUsernameList = []
for owner in ownersList:
if owner['principalType'] == 'User':
results = await gq.get_user_by_id(userID=owner['principalID'],graphServiceClient=graphServiceClient)
ownerUsernameList.append(results)
return ownerUsernameList
groupLoop = asyncio.new_event_loop()
asyncio.set_event_loop(groupLoop)
isOpen = asyncio.get_event_loop().is_closed()
print("Is the loop open? ", not isOpen)
>>>> Is the loop open? True
groupResults = groupLoop.run_until_complete(get_all_group_members())
groupLoop.close()
isClosed = asyncio.get_event_loop().is_closed()
print("Is the loop closed? ", str(isClosed))
>>>> Is the loop closed? True
usernameLoop = asyncio.new_event_loop()
asyncio.set_event_loop(usernameLoop)
isOpen = asyncio.get_event_loop().is_closed()
print("Is the loop open? ", not isOpen)
>>>> Is the loop open? True
usernameResults = usernameLoop.run_until_complete(get_user_name())
usernameLoop.close()
isClosed = asyncio.get_event_loop().is_closed()
print("Is the loop closed? ", str(isClosed))
>>>> Is the loop closed? True
那么,当我尝试处理第二个组和用户列表时,如何成功地让这些异步函数正常工作,并且在程序的第二次迭代中不会出错?为什么,即使我专门打开一个循环并且它说它已打开,我是否会收到“事件循环已关闭”错误?
该问题似乎与您正在使用的库以及它建议如何设置客户端对象有关。它建议在模块导入时创建客户端。例如。
client = ...
async def some_function():
return await client.do_something()
asyncio.run(some_function())
这种方法的问题是客户端嵌入了创建它时就存在的异步循环。因此,当它在新的事件循环上运行时,它仍然尝试使用旧的(封闭的)事件循环。
asyncio.run()
和您的其他方法都会在每次运行时创建一个新的事件循环,因此保证在第二次运行时失败。
建议的解决方案是每次在
async
函数内创建新的客户端。通过在 async
函数内创建客户端,您可以强制客户端链接到当前正在执行的事件循环。
可用于修改代码的基本结构是使用
contextvars
模块。它将允许您获取每个函数来获取其事件循环本地的客户端,并对当前代码进行最少的更改。
import asyncio
import contextvars
import random
client = contextvars.ContextVar('client')
async def job():
print(f'{client.get()=}')
async def main():
client.set(random.random())
await job()
asyncio.run(main())
asyncio.run(main())
将打印类似以下内容:
client.get()=0.8650939974300063
client.get()=0.20164744752604813
有关更多详细信息,请参阅:https://github.com/microsoftgraph/msgraph-sdk-python/issues/366