我必须从外部源下载文件并将其上传到 S3 存储桶中,到目前为止我发现适用于小文件,但对于较大的文件似乎无法上传。
我没有收到任何错误日志,文件只是保留在一个非常小的块,例如 39.5 KB 而不是 49 MB
这是我正在使用的代码
config = TransferConfig(multipart_threshold=1024 * 25, max_concurrency=10,
multipart_chunksize=1024 * 25, use_threads=True)
with requests.get(url, stream=True) as r:
s3.meta.client.upload_fileobj(r.raw, 'bucket_name', 'key_name', Config=config)
还尝试了使用
data = BytesIO(r.content)
的变体,然后传递它而不是 r.raw
,尝试使用 smart_open
python 库,但我要么没有很好地使用它,要么它只是无法达到我的目的,任何想法受到赞赏
S3 允许您将此类文件拆分为更小的组件。您依次上传每个组件,然后 S3 将它们组合成最终对象。您可以使用
FileChunkIO
模块,因此 pip install FileChunkIO
(如果尚未安装)。
欲了解更多,请前往这里。
这是一个完整的文件上传脚本工作示例,使用 boto3 库进行 AWS S3 交互,并使用 filechunkio 和 tqdm 库进行高效处理和可选的进度条显示。
from io import BytesIO
import os
import boto3
from filechunkio import FileChunkIO
from tqdm import tqdm # Import tqdm for progress bar
# Chunk size for uploading in parts
chunk_size_bytes = 1024 * 1024 * 10
# AWS S3 configuration
session = boto3.Session(
aws_access_key_id='your_access_key',
aws_secret_access_key='your_secret_key'
)
objects = session.client(service_name="s3", endpoint_url="https://your-s3-endpoint.com/", use_ssl=True)
# Local file path
source_path = '/path/to/video.mp4' # Full path to the local file
# Extracting the filename from the path
key_file = os.path.basename(source_path) # Extracted filename
# Target S3 bucket
target_bucket = "videos" # Name of the Amazon S3 bucket
# Content type of the file
content_type = "video/mp4" # MIME type or content type of the file
# Create a multipart upload
response = objects.create_multipart_upload(
ACL="public-read",
Bucket=target_bucket,
ContentType=content_type,
Key=key_file
)
UploadId = response['UploadId']
# Initialize part number and parts list
part_number = 1
parts = []
try:
# Get the total file size for tqdm
total_size = os.path.getsize(source_path)
# Open the local file using FileChunkIO for efficient handling of large files
with FileChunkIO(source_path, 'rb', offset=0, closefd=True) as fd:
for data in tqdm(iter(lambda: fd.read(chunk_size_bytes), b""), total=total_size/chunk_size_bytes, unit="MB", unit_scale=True, leave=False, dynamic_ncols=True):
# Upload each part
part = objects.upload_part(
Bucket=target_bucket,
Key=key_file,
Body=BytesIO(data),
PartNumber=part_number,
UploadId=UploadId
)
parts.append({"PartNumber": part_number, "ETag": part["ETag"]})
part_number += 1
# Complete the multipart upload
objects.complete_multipart_upload(
Bucket=target_bucket,
Key=key_file,
UploadId=UploadId,
MultipartUpload={"Parts": parts}
)
except Exception as e:
# Handle any exceptions, such as cleanup or logging
print(f"Error: {e}")
# Optionally abort the multipart upload if an error occurs
objects.abort_multipart_upload(Bucket=target_bucket, Key=key_file, UploadId=UploadId)
raise # Re-raise the exception after cleanup