上传带有md5验证的大文件(多部分),对AWS S3和阿里巴巴OSS使用相同的代码?

问题描述 投票:0回答:1

我希望将(可能)大文件(最多 5 Gig)上传到 Amazon S3 和阿里巴巴 OSS,并进行验证 - 最好是 MD5 验证。

我的代码目前看起来像:

#!/usr/bin/env python3.10

"""
This toy program is an exploration of how to do verified, multipart file uploads to AWS S3 and Alibaba OSS.

It's a modified version of https://stackoverflow.com/questions/58921396/boto3-multipart-upload-and-md5-checking
"""

import os
import sys
import hashlib

import boto3
from botocore.exceptions import ClientError
from botocore.client import Config
from boto3.s3.transfer import TransferConfig


chunk_size = 2**23


# This function is a re-worked function taken from here: https://stackoverflow.com/questions/43794838/multipart-upload-to-s3-with-hash-verification
# Credits to user: https://stackoverflow.com/users/518169/hyperknot
def calculate_s3_etag(file_path, chunk_size=chunk_size):
    """Calculate an S3/OSS etag for the file."""
    chunk_md5s = []

    with open(file_path, "rb") as fp:
        while True:
            data = fp.read(chunk_size)

            if not data:
                break

            chunk_md5s.append(hashlib.md5(data))

    num_hashes = len(chunk_md5s)

    if not num_hashes:
        # do whatever you want to do here
        raise ValueError

    if num_hashes == 1:
        return f"{chunk_md5s[0].hexdigest()}"

    digest_byte_string = b"".join(m.digest() for m in chunk_md5s)
    digests_md5 = hashlib.md5(digest_byte_string)

    return f"{digests_md5.hexdigest().lower()}-{num_hashes}"


def s3_md5sum(bucket_name, resource_name, client):
    """Calcuate an MD5 hash for the file."""
    try:
        et = client.head_object(Bucket=bucket_name, Key=resource_name)["ETag"]
        assert et[0] == '"' and et[-1] == '"'
        return et[1:-1].lower()
    except ClientError:
        # do whatever you want to do here
        raise ClientError


def upload_one_file(
    *,
    filename,
    endpoint_url,
    aws_credentials,
    aws_region,
    bucket,
    addressing_style=None,
):
    """Upload one file to S3.  The file can be up to 5 gigabytes in size, and will be verified on upload."""
    config = {}
    config["region_name"] = aws_region
    if addressing_style:
        config["s3"] = {"addressing_style": addressing_style}
    kwargs = {"config": Config(**config), **aws_credentials}
    if endpoint_url:
        kwargs["endpoint_url"] = endpoint_url
    client = boto3.client("s3", **kwargs)
    transfer_config = TransferConfig(multipart_chunksize=chunk_size)

    client.upload_file(filename, bucket, filename, Config=transfer_config)

    tag = calculate_s3_etag(filename)
    result = s3_md5sum(bucket, filename, client)

    return (tag, result)


class AuthorizationData:
    """Hold things about what S3/OSS to use."""

    def __init__(
        self, *, name, bucket, aws_region, aws_access_key_id, aws_secret_access_key, endpoint_url=None, addressing_style=None
    ):
        """Initialize."""
        self.name = name
        self.endpoint_url = endpoint_url
        self.bucket = bucket
        self.aws_region = aws_region
        # self.aws_credentials = aws_credentials
        self.aws_credentials = {
            "aws_access_key_id": aws_access_key_id,
            "aws_secret_access_key": aws_secret_access_key,
        }
        self.addressing_style = addressing_style


def create_test_inputs():
    """Create test files."""
    for filename, blocksize, total_size in (
        ("dastromberg-urandom-1k", 2**10, 2**10),
        ("dastromberg-urandom-16M", 2**20, 2**24),
        ("dastromberg-urandom-256M", 2**20, 2**28),
        ("dastromberg-urandom-1G", 2**20, 2**30),
        ("dastromberg-urandom-5G", 2**20, 5 * 2**30),
    ):
        if not os.path.isfile(filename):
            length_written = 0
            print("Creating test input %s" % filename)
            with (
                open("/dev/urandom", "rb") as infile,
                open(filename, "wb") as outfile,
            ):
                while length_written < total_size:
                    block = infile.read(blocksize)
                    outfile.write(block)
                    len_block = len(block)
                    assert blocksize == len_block
                    length_written += len_block

                assert length_written == total_size

        assert os.path.getsize(filename) == total_size, "Final length does not match. Perhaps remove %s and try again?" % filename


def main():
    """Test some AWS S3- and Alibaba OSS-use."""
    # This works.
    aws = AuthorizationData(
        name="AWS",
        bucket="aaa",
        aws_region="us-west-2",
        aws_access_key_id="bbb",
        aws_secret_access_key=os.environ["aws_secret_key"],
    )

    alibaba = AuthorizationData(
        name="Alibaba",
        # endpoint_url="https://blz-p-cdn-gamepublishing.oss-cn-hangzhou.aliyuncs.com/",
        endpoint_url="https://oss-cn-hangzhou.aliyuncs.com",
        bucket="ccc",
        aws_region="cn-hangzhou",
        aws_access_key_id="ddd",
        aws_secret_access_key=os.environ["alibaba_secret_key"],
        addressing_style="virtual",
    )

    all_good = True

    create_test_inputs()

    for auth in (
        aws,
        alibaba,
    ):
        for filename in (
            "dastromberg-urandom-1k",
            "dastromberg-urandom-16M",
            "dastromberg-urandom-256M",
            # "dastromberg-urandom-1G",
            # "dastromberg-urandom-5G",
        ):
            print(f"Checking {auth.name}: {filename}")
            (tag, result) = upload_one_file(
                filename=filename,
                endpoint_url=auth.endpoint_url,
                aws_credentials=auth.aws_credentials,
                aws_region=auth.aws_region,
                bucket=auth.bucket,
                addressing_style=auth.addressing_style,
            )

            if tag == result:
                print("Verification succeeded: %s, %s" % (tag, result))
            else:
                all_good = False
                print("Verification failed: %s, %s" % (tag, result), file=sys.stderr)

    if all_good:
        print("All tests passed")
    else:
        print("One or more tests failed", file=sys.stderr)
        sys.exit(1)


if __name__ == "__main__":
    main()

运行它时,AWS S3 验证所有测试输入,但阿里巴巴仅验证 1 KB 测试 - 16 MB 测试失败,更大的测试也是如此。

https://www.alibabacloud.com/help/en/oss/use-cases/can-i-use-etag-values-as-oss-md5-hashes-to-check-data-consistency 阿里巴巴说他们不建议使用MD5进行验证。

https://www.alibabacloud.com/help/en/oss/use-cases/check-data-transmission-integrity-by-using-crc-64中,他们似乎相对彻底地探索了使用x-oss-哈希-crc64ecma。 我宁愿使用 MD5(与 boto3 一起使用),但如果需要的话可以使用 crc64ecma(希望仍然与 boto3 一起使用)。

有人研究过这个吗?

有什么建议吗?

python amazon-web-services amazon-s3 boto3 alibaba-cloud-oss
1个回答
0
投票

事实证明,如果你愿意只上传小文件x或者如果你愿意跳过验证,可以通过boto3访问阿里巴巴。

我最终不得不使用阿里巴巴的oss2模块来获取大文件和验证上传。

这是使用 oss2 模块的示例代码:

#!/usr/local/cpython-3.10/bin/python3

# -*- coding: utf-8 -*-

"""
The following code shows how to use data verification when uploading/downloading using MD5.

For MD5 verification, users need to calculate the MD5 value of the uploaded content and put it in the `Content-MD5` of the header.

Note: Resumable upload does not support MD5. Other uploads/downloads support MD5 verification.

Based on https://github.com/aliyun/aliyun-oss-python-sdk/blob/master/examples/object_check.py
"""

import base64
import hashlib
import os
import random
import sys
import tempfile
import warnings

from shared import create_test_inputs

# <frozen importlib._bootstrap>:914: ImportWarning: _SixMetaPathImporter.find_spec() not found; falling back to find_module()
warnings.filterwarnings(
    "ignore",
    category=ImportWarning,
    message=".*_SixMetaPathImporter.find_spec.*not found; falling back to find_module.*",
)
# https://github.com/boto/boto3/issues/454 - This isn't boto3, but the issue might be the same.
warnings.filterwarnings(
    "ignore",
    category=ResourceWarning,
    message=".*unclosed <ssl.SSLSocket.*",
)

import oss2


chunk_size = 2**23

# First initialize AccessKeyId, AccessKeySecret, Endpoint and other information.
access_key_id = "<accesskey>"
access_key_secret = os.environ["alibaba_secret_key"]
bucket_name = "<bucket>"
endpoint = "https://oss-accelerate.aliyuncs.com"
# Confirm that the above parameters are filled in correctly
for param in (access_key_id, access_key_secret, bucket_name, endpoint):
    assert "<" not in param, "Please set parameters:" + param


def calculate_file_md5(file_path, blocksize=2**20):
    """Return a base64 eencoded MD5 hash of file_path."""
    hash_md5 = hashlib.md5()
    with open(file_path, "rb") as f:
        for chunk in iter(lambda: f.read(blocksize), b""):
            hash_md5.update(chunk)
    return base64.b64encode(hash_md5.digest()).decode("UTF-8")


def _prepare_temp_file(content):
    """
    Create a temporary file.

    :param content: file content
    :return file name
    """
    fd, pathname = tempfile.mkstemp(suffix="exam-progress-")
    os.write(fd, content)
    os.close(fd)
    return pathname


def usage(retval):
    """Output a usage message and exit."""
    if retval == 0:
        w = sys.stdout.write
    else:
        w = sys.stderr.write

    w(f"Usage: {sys.argv[0]} --with-md5 --without-md5 --help\n")

    sys.exit(retval)


def main():
    """Start the ball rolling."""
    use_md5 = None
    while sys.argv[1:]:
        if sys.argv[1] == "--with-md5":
            use_md5 = True
        elif sys.argv[1] == "--without-md5":
            use_md5 = False
        elif sys.argv[1] in ("--help", "-h"):
            usage(0)
        else:
            sys.stderr.write(f"{sys.argv[0]}: unrecognized option: {sys.argv[1]}\n")
            usage(1)
        del sys.argv[1]

    if use_md5 is None:
        sys.stderr.write(f"{sys.argv[0]}: you must specify one of --with-md5 or --without-md5\n")
        usage(1)

    create_test_inputs()

    # Create a Bucket object. All Object-related interfaces can be accessed through the Bucket object
    bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name)

    all_good = True

    for filename in (
        "dastromberg-urandom-1k",
        "dastromberg-urandom-16M",
        "dastromberg-urandom-256M",
        "dastromberg-urandom-1G",
        "dastromberg-urandom-5G",
    ):
        print(f"Checking Alibaba: {filename}", flush=True)
        try:
            key = upload_one_file(bucket=bucket, filename=filename, use_md5=use_md5)
        except (oss2.exceptions.ClientError, oss2.exceptions.ServerError, oss2.exceptions.RequestError):
            print("Verification failed")
            all_good = False
        else:
            print("Verification succeeded")
            # We don't want to clutter up the test storage.
            bucket.delete_object(key)

    if all_good:
        print("All tests passed")
    else:
        print("One or more tests failed", file=sys.stderr)
        sys.exit(1)


def random_string():
    """Return a random string."""
    return "".join(random.choices(string.ascii_uppercase + string.ascii_lowercase + string.digits, k=16))


def upload_one_file(*, filename, bucket, use_md5, disambig=None):
    """Upload one file, multipart, with optional MD5 check."""
    if disambig is None:
        key = filename
    elif disambig == "pid":
        key = filename + str(os.getpid())
    elif disambig == "random":
        key = filename + random_string()
    else:
        raise ValueError(f"disambig had a strange value: {disambig}")

    headers = {}

    if use_md5:
        md5_hash = calculate_file_md5(filename)
        headers["Content-MD5"] = md5_hash

    bucket.put_object_from_file(filename, filename, headers=headers)
    return key


if __name__ == "__main__":
    main()
© www.soinside.com 2019 - 2024. All rights reserved.