我正在尝试创建一个 token.json 文件来保存数据,而不必每次都登录来批准帐户 这是同时删除100个文件的代码。每次运行代码时,我都必须登录帐户进行审批。您需要做的就是添加一个保存文件 token.json
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.http import BatchHttpRequest
SCOPES = ['https://www.googleapis.com/auth/drive']
def callback(request_id, response, exception):
if exception:
print(f"An error occurred: {exception}")
else:
print(f"Deleted file ID: {request_id}")
def main():
flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
creds = flow.run_local_server(port=0)
service = build('drive', 'v3', credentials=creds)
results = service.files().list(pageSize=100, fields="nextPageToken, files(id)").execute()
items = results.get('files', [])
if not items:
print('No files found.')
else:
batch = service.new_batch_http_request(callback=callback)
for item in items:
batch.add(service.files().delete(fileId=item['id']))
batch.execute()
if __name__ == '__main__':
main()
我将创建一个函数
get_credemtials
,您可以调用该函数来获取构建服务所需的凭据,如下所示:
import os.path
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
def get_credentials(*,
scopes: list=['https://www.googleapis.com/auth/drive'],
token_path: str='token.json',
credentials_path: str='credentials.json'
) -> Credentials:
"""Given a path to a saved token (which may not exist) and a path to
your credentials file, return a `Credentials` instance.
"""
def recertify():
"""Create a new Credentials instance using InstalledAppFlow."""
flow = InstalledAppFlow.from_client_secrets_file(
credentials_path, scopes)
return flow.run_local_server(port=0)
creds = None
# The file token.json stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first
# time.
if os.path.exists(token_path):
# We have a token file. Recreate the credentials"
creds = Credentials.from_authorized_user_file(token_path, scopes)
if creds.valid:
# We have valid credentials
return creds
# Either token_path does not exist or the credentials are no longer valid.
if creds and creds.expired and creds.refresh_token:
# The credentials have expired. Try to refresh the credentials:
try:
creds.refresh(Request())
except Exception:
# Probaly the refresh token has expired, so we must start anew
creds = recertify()
else:
creds = recertify()
# Save the credentials for the next run
with open(token_path, 'w') as token:
token.write(creds.to_json())
return creds
def main():
creds = get_credentials() # Take defaults
service = build('drive', 'v3', credentials=creds)
... # etc.
if __name__ == '__main__':
main()