使用 Python 进行 Azure Blob 存储客户端解密

问题描述 投票:0回答:1

我想使用以下 C# 代码来解密通过客户端加密加密的 blob,该代码在生产中运行,并遵循 Microsoft C# 示例:

public static IServiceCollection AddEncryptedFileStorage<T>(this IServiceCollection services, string configurationSectionName) where T : class, IFileStorage
    {
        string clientName = configurationSectionName;
        services.AddAzureClients(clientBuilder =>
        {
            clientBuilder.AddClient<BlobServiceClient, BlobClientOptions>((options, credential, serviceProvider) =>
            {
                var configuration = serviceProvider.GetRequiredService<IConfiguration>();
                var configurationSection = configuration.GetSection(configurationSectionName);
 
                var storageAccountUri = FileStorageHelper.GetStorageAccountUri(configurationSection.GetRequiredValue<string>("StorageAccountName"));
                string encryptionKeyId = configurationSection.GetValue<string>("EncryptionKeyId")!;
 
                var keyClient = serviceProvider.GetRequiredService<KeyClient>();

                KeyVaultKey key = keyClient.GetKey(encryptionKeyId, cancellationToken: CancellationToken.None);

 
                var encryptionOptions = new ClientSideEncryptionOptions(ClientSideEncryptionVersion.V2_0)
                {
                    KeyEncryptionKey = new CryptographyClient(key.Id, credential),
                    KeyResolver = new KeyResolver(credential),
                    KeyWrapAlgorithm = "RSA-OAEP"
                };
 
                var blobClientOptions = new SpecializedBlobClientOptions
                {
                    ClientSideEncryption = encryptionOptions,
                    Retry = { Delay = options.Retry.NetworkTimeout, MaxRetries = options.Retry.MaxRetries }
                };
 
                return new BlobServiceClient(storageAccountUri, credential, blobClientOptions);
            }).WithName(clientName);
        });
        services.TryAddSingleton(typeof(T), sp => FileStorageFactory(sp, clientName, configurationSectionName));
 
        services.AddHealthChecks().AddBlobStorage(clientName);
 
        return services;
    }

按照 Microsoft Python 文档,我尝试运行以下代码,如果未设置

encryption_options
则打印数据,但否则给出此错误:

HttpResponseError: Decryption failed.
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-c5de91e5-86a9-4892-ae41-922218e4545e/lib/python3.10/site-packages/azure/storage/blob/_download.py:70, in process_content(data, start_offset, end_offset, encryption)
     69 try:
---> 70     return decrypt_blob(
     71         encryption.get("required") or False,
     72         encryption.get("key"),
     73         encryption.get("resolver"),
     74         content,
     75         start_offset,
     76         end_offset,
     77         data.response.headers,
     78     )
     79 except Exception as error:
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-c5de91e5-86a9-4892-ae41-922218e4545e/lib/python3.10/site-packages/azure/storage/blob/_encryption.py:905, in decrypt_blob(require_encryption, key_encryption_key, key_resolver, content, start_offset, end_offset, response_headers)
    903     raise ValueError('Specified encryption version is not supported.')
--> 905 content_encryption_key = _validate_and_unwrap_cek(encryption_data, key_encryption_key, key_resolver)
    907 if version == _ENCRYPTION_PROTOCOL_V1:
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-c5de91e5-86a9-4892-ae41-922218e4545e/lib/python3.10/site-packages/azure/storage/blob/_encryption.py:658, in _validate_and_unwrap_cek(encryption_data, key_encryption_key, key_resolver)
    657 if not hasattr(key_encryption_key, 'get_kid') or not callable(key_encryption_key.get_kid):
--> 658     raise AttributeError(_ERROR_OBJECT_INVALID.format('key encryption key', 'get_kid'))
    659 if not hasattr(key_encryption_key, 'unwrap_key') or not callable(key_encryption_key.unwrap_key):
AttributeError: key encryption key does not define a complete interface. Value of get_kid is either missing or invalid.
from azure.storage.blob import BlobServiceClient  
from azure.identity import DefaultAzureCredential, DeviceCodeCredential  
from azure.keyvault.keys import KeyClient  
from azure.keyvault.keys.crypto import CryptographyClient
from azure.keyvault.keys.crypto import KeyWrapAlgorithm  
from azure.core.exceptions import ResourceNotFoundError  

credential = DeviceCodeCredential(additionally_allowed_tenants =["<tenant id>"]) 
  
def configure_blob_service_client(key_id, cred):  
    storage_account_name = "<storage_name>"  
    storage_account_uri = f"https://{storage_account_name}.blob.core.windows.net"  

    encryption_options = {
        #"require_encryption": True,
        "key_encryption_key": CryptographyClient(key_id, credential),#ExtendedCryptographyClient(key.id , credential),
        "encryption_version": '2.0',
        "key_wrap_algorithm": KeyWrapAlgorithm.rsa_oaep
    }
      
    blob_service_client = BlobServiceClient(account_url=storage_account_uri, credential=cred, **encryption_options)  
    return blob_service_client

key_vault_name = f"<key_vault_name>"  
key_vault_uri = f"https://{key_vault_name}.vault.azure.net"  
    

key_client = KeyClient(vault_url=key_vault_uri, credential=credential)  
    
key_name = "<key_name>/<key_version>"
key = key_client.get_key(key_name)  
display(key.id)

blob_service_client = configure_blob_service_client(key.id, credential)  
  
blob_client = blob_service_client.get_blob_client(container="<container_name>", blob="<blob_name>")  

downloaded = blob_client.download_blob()  
blob_data = downloaded.readall()  
  
blob_data

所以SDK期望密钥加密密钥有一个

get_kid()
方法(SDK源代码),但是SDK的CryptographyClient没有这样的方法。

尝试自己实现 get_kid() 方法,这会导致其他问题,所以可能不是正确的方法

python c# azure encryption azure-blob-storage
1个回答
0
投票

解决方案只是扩展

CryptoGraphy
客户端...

class ExtendedCryptographyClient:  
def __init__(self, key_id, credential):  
    self.cryptography_client = CryptographyClient(key_id, credential)  
    self.key_id = key_id  

def wrap_key(self, key: bytes) -> bytes:  
    # Implement wrapping logic using cryptography_client  
    wrap_result = self.cryptography_client.wrap_key(KeyWrapAlgorithm.rsa_oaep, key)  
    return wrap_result.encrypted_key  

def unwrap_key(self, key: bytes, algorithm: str) -> bytes:  
    # Implement unwrapping logic using cryptography_client  
    unwrap_result = self.cryptography_client.unwrap_key(algorithm, key)  
    return unwrap_result.key  

def get_kid(self) -> str:  
    # Return the key ID  
    return self.key_id  

def get_key_wrap_algorithm(self) -> str:  
    # Return the key wrap algorithm used  
    return KeyWrapAlgorithm.rsa_oaep
© www.soinside.com 2019 - 2024. All rights reserved.