我正在为 Google Workspace 创建一个自动化脚本,它会获取某个组织单位的直接子级,然后同时获取所有这些 OU 的子级。在网上搜索多处理、线程或异步处理是否最适合我的答案时,我了解到 asyncio 将帮助我解决这个问题。我创建了一个类
Google Tenant
,它保存与 google api 和获取的用户的连接。
但是,我现在的问题是脚本仍然不是异步的,但它按顺序工作而不是异步调用
from google.oauth2 import service_account
from googleapiclient.discovery import build
import logging
import asyncio
class GoogleTenant:
def __init__(self, api: str, version: str):
config: ScriptConfig = ScriptConfig()
credentials = service_account.Credentials.from_service_account_file(config["gcloud"]["keypath"],
scopes=SCOPES)
delegated_credentials = credentials.with_subject(config["gcloud"]["subject"])
self.service = build(api, version, credentials=delegated_credentials)
self.users_list = []
def fetch_users(self):
users_ous = self.service.orgunits().list(customerId="my_customer", orgUnitPath="/example", type="children", fields="organizationUnits/orgUnitPath").execute()
ous_list = [ou["orgUnitPath"] for ou in users_ous["organizationUnits"]]
asyncio.run(self._crawl_ous(ous_list))
async def _crawl_ous(self, ous: list):
crawling_result = await asyncio.gather(*[asyncio.create_task(self._fetch_users_from_ou(ou)) for ou in ous])
for result in crawling_result:
logging.info(f"Crawling result of ou {result[0]["organizations"][0]["department"]}: {len(result)}")
self.users_list.extend(result)
async def _fetch_users_from_ou(self, ou):
call_parameters = {
"customer": "my_customer",
"maxResults": 500,
"projection": "basic",
"query": f"orgUnitPath='{str(ou)}'",
"fields": "users/id,users/name,users/primaryEmail,users/suspended,users/emails,users/organizations/primary,users/organizations/department,users/recoveryEmail,nextPageToken"
}
logger.debug(f"Fetching users from {ou}")
users_from_ou = self.service.users().list(**call_parameters).execute()
user_fetching_result: list = users_from_ou["users"]
logger.debug(f"Initial fetch from {ou}: {len(users_from_ou["users"])}")
if "nextPageToken" in users_from_ou:
next_page_token = users_from_ou["nextPageToken"]
else:
return user_fetching_result
while True:
users_from_ou = self.service.users().list(**call_parameters, pageToken=next_page_token).execute()
logger.debug(f"Next fetch from {ou}: {len(users_from_ou["users"])}")
user_fetching_result.extend(users_from_ou["users"])
if "nextPageToken" in users_from_ou:
next_page_token = users_from_ou["nextPageToken"]
else:
return user_fetching_result
if __name__ == '__main__':
google_tenant = GoogleTenant("admin", "directory_v1")
google_tenant.fetch_users()
执行结果如下:
DEBUG:root:Fetching users from /example/child1
DEBUG:root:Initial fetch from /example/child1: 500
DEBUG:root:Next fetch from /example/child1: 500
DEBUG:root:Next fetch from /example/child1: 500
DEBUG:root:Next fetch from /example/child1: 258
DEBUG:root:Fetching users from /example/child2
DEBUG:root:Initial fetch from /example/child2: 500
DEBUG:root:Next fetch from /example/child2: 500
DEBUG:root:Next fetch from /example/child2: 500
...
我尝试在某些地方输入await语句,但是我似乎误解了它应该如何根据我的理解await语句使函数在继续函数执行之前等待结果。 我怎样才能让Python同时执行这些?
我按照@Michael Butscher的建议重新格式化了部分代码,并在前一个块中添加了我的导入
def fetch_users(self):
users_ous = self.service.orgunits().list(customerId="my_customer", orgUnitPath="/example", type="children", fields="organizationUnits/orgUnitPath").execute()
ous_list = [ou["orgUnitPath"] for ou in users_ous["organizationUnits"]]
logger.debug(f"Fetched and sanitized ous: {pprint.pformat(ous_list)}")
asyncio.run(self._crawl_ous(ous_list))
async def _crawl_ous(self, ous: list):
tasks = [self._crawler_proxy(ou) for ou in ous]
crawling_result = await asyncio.gather(*tasks)
for result in crawling_result:
logger.info(f"Crawling result: {len(result)}")
self.users_list.extend(result)
async def _crawler_proxy(self, *args, **kwargs):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._fetch_users_from_ou(*args, **kwargs))
你的罪魁祸首是协程
_fetch_users_from_ou
。它内部没有任何 await
语句,因此这意味着在该函数执行之前,它不允许任何其他任务在其间运行,因此所有任务都在 asyncio.gather
内的 _crawl_ous
调用中按顺序运行。
您希望进行这些更改:
users_from_ou = await asyncio.to_thread( self.service.users().list(**call_parameters).execute)
users_from_ou = await asyncio.to_thread( self.service.users().list(**call_parameters, pageToken=next_page_token).execute)
假设你的 python 版本高于 3.9 以及更多关于它的作用:: https://docs.python.org/3/library/asyncio-task.html#running-in-threads