这是我的职责:
def write_sparse_matrix_to_s3(matrix, bucket_name, key, header=None):
"""
Writes a sparse matrix in Compressed Sparse Row (CSR) format to an S3 bucket.
Args:
matrix (scipy.sparse.csr_matrix): The sparse matrix to be written.
bucket_name (str): The name of the S3 bucket to write the matrix to.
key (str): The key (file name) to use for the object in S3.
header (list, optional): A list of strings representing the header information to be saved with the matrix.
"""
s3 = boto3.client('s3')
# Save the matrix data to a BytesIO object
matrix_data = io.BytesIO()
np.savez(matrix_data, data=matrix.data, indices=matrix.indices, indptr=matrix.indptr, shape=matrix.shape)
# Prepare the header information
if header is not None:
header_data = '\n'.join(header).encode('utf-8')
matrix_data.seek(0)
matrix_data = header_data + b'\n' + matrix_data.read()
# Upload the file to S3
s3.put_object(Bucket=bucket_name, Key=key, Body=matrix_data.getvalue())
print(f"Sparse matrix saved to s3://{bucket_name}/{key}")
但是,当我在大数据上运行它时,我收到此错误 - ClientError:调用 PutObject 操作时发生错误(EntityTooLarge):您建议的上传超出了允许的最大大小
需要帮助编写将大型 CSR 文件写入 S3 并从 S3 读回的函数
上传到S3的对象太大时会出现此错误。如果 AWS 开发工具包、REST API 或 AWS CLI 使用单个 PUT 操作,您可以上传最大 5 GB 的单个对象。
上传对象 - https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
要上传大小超过 5 GB 的对象,您可以使用分段上传,通过分段上传,您可以上传最大 5 TB 的单个大对象。
使用分段上传来上传和复制对象 - https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html 您需要所有这些权限才能使用 upload_file:
"Effect": "Allow",
"Action": [
"s3:AbortMultipartUpload",
"s3:DeleteObject",
"s3:ListMultipartUploadParts",
"s3:PutObject",
"s3:GetObject"
在您的代码中,put_object 会将所有内容一次性发送,限制为 5GB。相反,如果需要,使用 upload_file 将执行“分段”上传,并在完成后调用回调函数。您必须使用类似下面的代码
def write_sparse_matrix_to_s3(matrix, bucket_name, key, header=None):
"""
Writes a sparse matrix in Compressed Sparse Row (CSR) format to an S3 bucket using multipart upload.
Args:
matrix (scipy.sparse.csr_matrix): The sparse matrix to be written.
bucket_name (str): The name of the S3 bucket to write the matrix to.
key (str): The key (file name) to use for the object in S3.
header (list, optional): A list of strings representing the header information to be saved with the matrix.
"""
s3 = boto3.client('s3')
# Save the matrix data to a temporary file
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
np.savez(temp_file, data=matrix.data, indices=matrix.indices, indptr=matrix.indptr, shape=matrix.shape)
# Prepare the header information (if provided)
if header is not None:
header_data = '\n'.join(header).encode('utf-8')
with open(temp_file.name, 'rb+') as f:
existing_data = f.read()
f.seek(0)
f.write(header_data + b'\n' + existing_data)
# Configure transfer with multipart threshold
config = TransferConfig(multipart_threshold=5 * 1024 * 1024) # 5MB threshold
# Upload the file to S3 using multipart upload
s3.upload_file(temp_file.name, bucket_name, key, Config=config)
# Delete the temporary file
os.remove(temp_file.name)
print(f"Sparse matrix saved to s3://{bucket_name}/{key}")