我希望将(可能)大文件(最多 5 Gig)上传到 Amazon S3 和阿里巴巴 OSS,并进行验证 - 最好是 MD5 验证。
我的代码目前看起来像:
#!/usr/bin/env python3.10
"""
This toy program is an exploration of how to do verified, multipart file uploads to AWS S3 and Alibaba OSS.
It's a modified version of https://stackoverflow.com/questions/58921396/boto3-multipart-upload-and-md5-checking
"""
import os
import sys
import hashlib
import boto3
from botocore.exceptions import ClientError
from botocore.client import Config
from boto3.s3.transfer import TransferConfig
chunk_size = 2**23
# This function is a re-worked function taken from here: https://stackoverflow.com/questions/43794838/multipart-upload-to-s3-with-hash-verification
# Credits to user: https://stackoverflow.com/users/518169/hyperknot
def calculate_s3_etag(file_path, chunk_size=chunk_size):
"""Calculate an S3/OSS etag for the file."""
chunk_md5s = []
with open(file_path, "rb") as fp:
while True:
data = fp.read(chunk_size)
if not data:
break
chunk_md5s.append(hashlib.md5(data))
num_hashes = len(chunk_md5s)
if not num_hashes:
# do whatever you want to do here
raise ValueError
if num_hashes == 1:
return f"{chunk_md5s[0].hexdigest()}"
digest_byte_string = b"".join(m.digest() for m in chunk_md5s)
digests_md5 = hashlib.md5(digest_byte_string)
return f"{digests_md5.hexdigest().lower()}-{num_hashes}"
def s3_md5sum(bucket_name, resource_name, client):
"""Calcuate an MD5 hash for the file."""
try:
et = client.head_object(Bucket=bucket_name, Key=resource_name)["ETag"]
assert et[0] == '"' and et[-1] == '"'
return et[1:-1].lower()
except ClientError:
# do whatever you want to do here
raise ClientError
def upload_one_file(
*,
filename,
endpoint_url,
aws_credentials,
aws_region,
bucket,
addressing_style=None,
):
"""Upload one file to S3. The file can be up to 5 gigabytes in size, and will be verified on upload."""
config = {}
config["region_name"] = aws_region
if addressing_style:
config["s3"] = {"addressing_style": addressing_style}
kwargs = {"config": Config(**config), **aws_credentials}
if endpoint_url:
kwargs["endpoint_url"] = endpoint_url
client = boto3.client("s3", **kwargs)
transfer_config = TransferConfig(multipart_chunksize=chunk_size)
client.upload_file(filename, bucket, filename, Config=transfer_config)
tag = calculate_s3_etag(filename)
result = s3_md5sum(bucket, filename, client)
return (tag, result)
class AuthorizationData:
"""Hold things about what S3/OSS to use."""
def __init__(
self, *, name, bucket, aws_region, aws_access_key_id, aws_secret_access_key, endpoint_url=None, addressing_style=None
):
"""Initialize."""
self.name = name
self.endpoint_url = endpoint_url
self.bucket = bucket
self.aws_region = aws_region
# self.aws_credentials = aws_credentials
self.aws_credentials = {
"aws_access_key_id": aws_access_key_id,
"aws_secret_access_key": aws_secret_access_key,
}
self.addressing_style = addressing_style
def create_test_inputs():
"""Create test files."""
for filename, blocksize, total_size in (
("dastromberg-urandom-1k", 2**10, 2**10),
("dastromberg-urandom-16M", 2**20, 2**24),
("dastromberg-urandom-256M", 2**20, 2**28),
("dastromberg-urandom-1G", 2**20, 2**30),
("dastromberg-urandom-5G", 2**20, 5 * 2**30),
):
if not os.path.isfile(filename):
length_written = 0
print("Creating test input %s" % filename)
with (
open("/dev/urandom", "rb") as infile,
open(filename, "wb") as outfile,
):
while length_written < total_size:
block = infile.read(blocksize)
outfile.write(block)
len_block = len(block)
assert blocksize == len_block
length_written += len_block
assert length_written == total_size
assert os.path.getsize(filename) == total_size, "Final length does not match. Perhaps remove %s and try again?" % filename
def main():
"""Test some AWS S3- and Alibaba OSS-use."""
# This works.
aws = AuthorizationData(
name="AWS",
bucket="aaa",
aws_region="us-west-2",
aws_access_key_id="bbb",
aws_secret_access_key=os.environ["aws_secret_key"],
)
alibaba = AuthorizationData(
name="Alibaba",
# endpoint_url="https://blz-p-cdn-gamepublishing.oss-cn-hangzhou.aliyuncs.com/",
endpoint_url="https://oss-cn-hangzhou.aliyuncs.com",
bucket="ccc",
aws_region="cn-hangzhou",
aws_access_key_id="ddd",
aws_secret_access_key=os.environ["alibaba_secret_key"],
addressing_style="virtual",
)
all_good = True
create_test_inputs()
for auth in (
aws,
alibaba,
):
for filename in (
"dastromberg-urandom-1k",
"dastromberg-urandom-16M",
"dastromberg-urandom-256M",
# "dastromberg-urandom-1G",
# "dastromberg-urandom-5G",
):
print(f"Checking {auth.name}: {filename}")
(tag, result) = upload_one_file(
filename=filename,
endpoint_url=auth.endpoint_url,
aws_credentials=auth.aws_credentials,
aws_region=auth.aws_region,
bucket=auth.bucket,
addressing_style=auth.addressing_style,
)
if tag == result:
print("Verification succeeded: %s, %s" % (tag, result))
else:
all_good = False
print("Verification failed: %s, %s" % (tag, result), file=sys.stderr)
if all_good:
print("All tests passed")
else:
print("One or more tests failed", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()
运行它时,AWS S3 验证所有测试输入,但阿里巴巴仅验证 1 KB 测试 - 16 MB 测试失败,更大的测试也是如此。
在 https://www.alibabacloud.com/help/en/oss/use-cases/can-i-use-etag-values-as-oss-md5-hashes-to-check-data-consistency 阿里巴巴说他们不建议使用MD5进行验证。
在https://www.alibabacloud.com/help/en/oss/use-cases/check-data-transmission-integrity-by-using-crc-64中,他们似乎相对彻底地探索了使用x-oss-哈希-crc64ecma。 我宁愿使用 MD5(与 boto3 一起使用),但如果需要的话可以使用 crc64ecma(希望仍然与 boto3 一起使用)。
有人研究过这个吗?
有什么建议吗?
事实证明,如果你愿意只上传小文件x或者如果你愿意跳过验证,可以通过boto3访问阿里巴巴。
我最终不得不使用阿里巴巴的oss2模块来获取大文件和验证上传。
这是使用 oss2 模块的示例代码:
#!/usr/local/cpython-3.10/bin/python3
# -*- coding: utf-8 -*-
"""
The following code shows how to use data verification when uploading/downloading using MD5.
For MD5 verification, users need to calculate the MD5 value of the uploaded content and put it in the `Content-MD5` of the header.
Note: Resumable upload does not support MD5. Other uploads/downloads support MD5 verification.
Based on https://github.com/aliyun/aliyun-oss-python-sdk/blob/master/examples/object_check.py
"""
import base64
import hashlib
import os
import random
import sys
import tempfile
import warnings
from shared import create_test_inputs
# <frozen importlib._bootstrap>:914: ImportWarning: _SixMetaPathImporter.find_spec() not found; falling back to find_module()
warnings.filterwarnings(
"ignore",
category=ImportWarning,
message=".*_SixMetaPathImporter.find_spec.*not found; falling back to find_module.*",
)
# https://github.com/boto/boto3/issues/454 - This isn't boto3, but the issue might be the same.
warnings.filterwarnings(
"ignore",
category=ResourceWarning,
message=".*unclosed <ssl.SSLSocket.*",
)
import oss2
chunk_size = 2**23
# First initialize AccessKeyId, AccessKeySecret, Endpoint and other information.
access_key_id = "<accesskey>"
access_key_secret = os.environ["alibaba_secret_key"]
bucket_name = "<bucket>"
endpoint = "https://oss-accelerate.aliyuncs.com"
# Confirm that the above parameters are filled in correctly
for param in (access_key_id, access_key_secret, bucket_name, endpoint):
assert "<" not in param, "Please set parameters:" + param
def calculate_file_md5(file_path, blocksize=2**20):
"""Return a base64 eencoded MD5 hash of file_path."""
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(blocksize), b""):
hash_md5.update(chunk)
return base64.b64encode(hash_md5.digest()).decode("UTF-8")
def _prepare_temp_file(content):
"""
Create a temporary file.
:param content: file content
:return file name
"""
fd, pathname = tempfile.mkstemp(suffix="exam-progress-")
os.write(fd, content)
os.close(fd)
return pathname
def usage(retval):
"""Output a usage message and exit."""
if retval == 0:
w = sys.stdout.write
else:
w = sys.stderr.write
w(f"Usage: {sys.argv[0]} --with-md5 --without-md5 --help\n")
sys.exit(retval)
def main():
"""Start the ball rolling."""
use_md5 = None
while sys.argv[1:]:
if sys.argv[1] == "--with-md5":
use_md5 = True
elif sys.argv[1] == "--without-md5":
use_md5 = False
elif sys.argv[1] in ("--help", "-h"):
usage(0)
else:
sys.stderr.write(f"{sys.argv[0]}: unrecognized option: {sys.argv[1]}\n")
usage(1)
del sys.argv[1]
if use_md5 is None:
sys.stderr.write(f"{sys.argv[0]}: you must specify one of --with-md5 or --without-md5\n")
usage(1)
create_test_inputs()
# Create a Bucket object. All Object-related interfaces can be accessed through the Bucket object
bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name)
all_good = True
for filename in (
"dastromberg-urandom-1k",
"dastromberg-urandom-16M",
"dastromberg-urandom-256M",
"dastromberg-urandom-1G",
"dastromberg-urandom-5G",
):
print(f"Checking Alibaba: {filename}", flush=True)
try:
key = upload_one_file(bucket=bucket, filename=filename, use_md5=use_md5)
except (oss2.exceptions.ClientError, oss2.exceptions.ServerError, oss2.exceptions.RequestError):
print("Verification failed")
all_good = False
else:
print("Verification succeeded")
# We don't want to clutter up the test storage.
bucket.delete_object(key)
if all_good:
print("All tests passed")
else:
print("One or more tests failed", file=sys.stderr)
sys.exit(1)
def random_string():
"""Return a random string."""
return "".join(random.choices(string.ascii_uppercase + string.ascii_lowercase + string.digits, k=16))
def upload_one_file(*, filename, bucket, use_md5, disambig=None):
"""Upload one file, multipart, with optional MD5 check."""
if disambig is None:
key = filename
elif disambig == "pid":
key = filename + str(os.getpid())
elif disambig == "random":
key = filename + random_string()
else:
raise ValueError(f"disambig had a strange value: {disambig}")
headers = {}
if use_md5:
md5_hash = calculate_file_md5(filename)
headers["Content-MD5"] = md5_hash
bucket.put_object_from_file(filename, filename, headers=headers)
return key
if __name__ == "__main__":
main()