在循环中使用 Asyncio

问题描述 投票:0回答:1

我创建了一个程序,其中包含两个使用 asyncio 运行的主要异步函数:

  1. get_all_group_members():获取 Entra ID 组 ID 列表,并使用 Microsoft Graph API(使用另一个脚本中的函数)返回该组的成员列表
  2. get_user_name():获取有权访问 Azure 资源的用户列表,并使用 Microsoft Graph API 获取用户的显示名称

如果我使用一份 Entra ID 组列表和一份 Azure 资源用户列表运行我的程序一次,一切都会正常工作,我会从 Graph API 获得我期望的结果。但是,如果我随后将代码放入一个循环中,该循环对多个组列表和多个 Azure 资源用户列表执行相同的操作,则仅在循环的第二次迭代中才会失败,并出现错误

Event loop is closed
。无论我对 asyncio 使用哪种方法,我都会在循环的第二次迭代中遇到此错误。

我想知道如何使用 asyncio 执行单独的 Graph API 异步调用并使第二个 API 调用循环正常工作。我需要能够循环遍历资源/组列表并获取所有资源/组的数据,而不仅仅是一次运行一个组/列表。

我尝试过的(这些都不适合我):

  1. 使用 asyncio.gather() 然后使用 asyncio.run()
async def get_all_group_members():
    memberListAll = []
    for group in groupList:
        results = await gq.get_group_user(group['GroupID'], graphServiceClient)
        memberDict = {"GroupName":group["GroupName"], "GroupID":group["GroupID"], "MemberList":results}
        memberListAll.append(memberDict)
    return memberListAll

# This function will get the display name for an Entra ID user based on the object ID for that account
async def get_user_name():
    ownerUsernameList = []
    for owner in ownersList:
        if owner['principalType'] == 'User':
            results = await gq.get_user_by_id(userID=owner['principalID'],graphServiceClient=graphServiceClient)
            ownerUsernameList.append(results)
    return ownerUsernameList

async def run_async_functions():
    groupResults, usernameResults = await asyncio.gather(get_all_group_members(), get_user_name())
    return groupResults, usernameResults

groupResults, usernameResults = asyncio.run(run_async_functions())
  1. 创建我自己的循环,打开循环,使用循环,然后关闭循环。在处理我的列表/调用图形 API 的第二次迭代中,我的循环会说它已打开,但当它运行异步函数时我仍然会收到上述错误。
asyncio.set_event_loop(asyncio.new_event_loop())
newLoop = asyncio.get_event_loop()
isOpen = asyncio.get_event_loop().is_closed()
print("Is the loop open? ", not isOpen)
    >>>> Is the loop open? True

async def get_all_group_members():
    memberListAll = []
    for group in groupList:
        results = await gq.get_group_user(group['GroupID'], graphServiceClient) # Calls function from other script I wrote
        memberDict = {"GroupName":group["GroupName"], "GroupID":group["GroupID"], "MemberList":results}
        memberListAll.append(memberDict)
    return memberListAll
        
async def get_user_name():
    ownerUsernameList = []
    for owner in ownersList:
        if owner['principalType'] == 'User':
            results = await gq.get_user_by_id(userID=owner['principalID'],graphServiceClient=graphServiceClient)
            ownerUsernameList.append(results)
    return ownerUsernameList
        
# This function calls both of the above async functions so that we can have one function for asyncio to run
async def run_async_functions():
    groupResults = await get_all_group_members()
    usernameResults = await get_user_name()
    return groupResults, usernameResults

groupResults, usernameResults = newLoop.run_until_complete(run_async_functions())

newLoop.close()
isClosed = asyncio.get_event_loop().is_closed()
print("Is the loop closed? ", str(isClosed))
    >>>> Is the loop closed? True
  1. 为第一个异步函数创建一个单独的循环,关闭该循环,然后为第二个异步函数创建第二个循环。然而,我仍然遇到相同错误的失败,但仅在我的脚本的第二次迭代处理第二个组/用户列表时发生。
async def get_all_group_members():
    memberListAll = []
    for group in groupList:
        results = await gq.get_group_user(group['GroupID'], graphServiceClient)
        memberDict = {"GroupName":group["GroupName"], "GroupID":group["GroupID"], "MemberList":results}
        memberListAll.append(memberDict)
    return memberListAll
        
async def get_user_name():
    ownerUsernameList = []
    for owner in ownersList:
        if owner['principalType'] == 'User':
            results = await gq.get_user_by_id(userID=owner['principalID'],graphServiceClient=graphServiceClient)
            ownerUsernameList.append(results)
    return ownerUsernameList
    
groupLoop = asyncio.new_event_loop()
asyncio.set_event_loop(groupLoop)
isOpen = asyncio.get_event_loop().is_closed()
print("Is the loop open? ", not isOpen)
    >>>> Is the loop open? True

groupResults = groupLoop.run_until_complete(get_all_group_members())

groupLoop.close()
isClosed = asyncio.get_event_loop().is_closed()
print("Is the loop closed? ", str(isClosed))
    >>>> Is the loop closed? True

usernameLoop = asyncio.new_event_loop()
asyncio.set_event_loop(usernameLoop)
isOpen = asyncio.get_event_loop().is_closed()
print("Is the loop open? ", not isOpen)
    >>>> Is the loop open? True

usernameResults = usernameLoop.run_until_complete(get_user_name())

usernameLoop.close()
isClosed = asyncio.get_event_loop().is_closed()
print("Is the loop closed? ", str(isClosed))
    >>>> Is the loop closed? True

那么,当我尝试处理第二个组和用户列表时,如何成功地让这些异步函数正常工作,并且在程序的第二次迭代中不会出错?为什么,即使我专门打开一个循环并且它说它已打开,我是否会收到“事件循环已关闭”错误?

python python-asyncio
1个回答
0
投票

该问题似乎与您正在使用的库以及它建议如何设置客户端对象有关。它建议在模块导入时创建客户端。例如。

client = ...

async def some_function():
    return await client.do_something()

asyncio.run(some_function())

这种方法的问题是客户端嵌入了创建它时就存在的异步循环。因此,当它在新的事件循环上运行时,它仍然尝试使用旧的(封闭的)事件循环。

asyncio.run()
和您的其他方法都会在每次运行时创建一个新的事件循环,因此保证在第二次运行时失败。

建议的解决方案是每次在

async
函数内创建新的客户端。通过在
async
函数内创建客户端,您可以强制客户端链接到当前正在执行的事件循环。

可用于修改代码的基本结构是使用

contextvars
模块。它将允许您获取每个函数来获取其事件循环本地的客户端,并对当前代码进行最少的更改。

import asyncio
import contextvars
import random

client = contextvars.ContextVar('client')

async def job():
    print(f'{client.get()=}')

async def main():
    client.set(random.random())
    await job()

asyncio.run(main())
asyncio.run(main())

将打印类似以下内容:

client.get()=0.8650939974300063
client.get()=0.20164744752604813

有关更多详细信息,请参阅:https://github.com/microsoftgraph/msgraph-sdk-python/issues/366

© www.soinside.com 2019 - 2024. All rights reserved.