我想使用以下 C# 代码来解密通过客户端加密加密的 blob,该代码在生产中运行,并遵循 Microsoft C# 示例:
public static IServiceCollection AddEncryptedFileStorage<T>(this IServiceCollection services, string configurationSectionName) where T : class, IFileStorage
{
string clientName = configurationSectionName;
services.AddAzureClients(clientBuilder =>
{
clientBuilder.AddClient<BlobServiceClient, BlobClientOptions>((options, credential, serviceProvider) =>
{
var configuration = serviceProvider.GetRequiredService<IConfiguration>();
var configurationSection = configuration.GetSection(configurationSectionName);
var storageAccountUri = FileStorageHelper.GetStorageAccountUri(configurationSection.GetRequiredValue<string>("StorageAccountName"));
string encryptionKeyId = configurationSection.GetValue<string>("EncryptionKeyId")!;
var keyClient = serviceProvider.GetRequiredService<KeyClient>();
KeyVaultKey key = keyClient.GetKey(encryptionKeyId, cancellationToken: CancellationToken.None);
var encryptionOptions = new ClientSideEncryptionOptions(ClientSideEncryptionVersion.V2_0)
{
KeyEncryptionKey = new CryptographyClient(key.Id, credential),
KeyResolver = new KeyResolver(credential),
KeyWrapAlgorithm = "RSA-OAEP"
};
var blobClientOptions = new SpecializedBlobClientOptions
{
ClientSideEncryption = encryptionOptions,
Retry = { Delay = options.Retry.NetworkTimeout, MaxRetries = options.Retry.MaxRetries }
};
return new BlobServiceClient(storageAccountUri, credential, blobClientOptions);
}).WithName(clientName);
});
services.TryAddSingleton(typeof(T), sp => FileStorageFactory(sp, clientName, configurationSectionName));
services.AddHealthChecks().AddBlobStorage(clientName);
return services;
}
按照 Microsoft Python 文档,我尝试运行以下代码,如果未设置
encryption_options
则打印数据,但否则给出此错误:
HttpResponseError: Decryption failed.
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-c5de91e5-86a9-4892-ae41-922218e4545e/lib/python3.10/site-packages/azure/storage/blob/_download.py:70, in process_content(data, start_offset, end_offset, encryption)
69 try:
---> 70 return decrypt_blob(
71 encryption.get("required") or False,
72 encryption.get("key"),
73 encryption.get("resolver"),
74 content,
75 start_offset,
76 end_offset,
77 data.response.headers,
78 )
79 except Exception as error:
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-c5de91e5-86a9-4892-ae41-922218e4545e/lib/python3.10/site-packages/azure/storage/blob/_encryption.py:905, in decrypt_blob(require_encryption, key_encryption_key, key_resolver, content, start_offset, end_offset, response_headers)
903 raise ValueError('Specified encryption version is not supported.')
--> 905 content_encryption_key = _validate_and_unwrap_cek(encryption_data, key_encryption_key, key_resolver)
907 if version == _ENCRYPTION_PROTOCOL_V1:
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-c5de91e5-86a9-4892-ae41-922218e4545e/lib/python3.10/site-packages/azure/storage/blob/_encryption.py:658, in _validate_and_unwrap_cek(encryption_data, key_encryption_key, key_resolver)
657 if not hasattr(key_encryption_key, 'get_kid') or not callable(key_encryption_key.get_kid):
--> 658 raise AttributeError(_ERROR_OBJECT_INVALID.format('key encryption key', 'get_kid'))
659 if not hasattr(key_encryption_key, 'unwrap_key') or not callable(key_encryption_key.unwrap_key):
AttributeError: key encryption key does not define a complete interface. Value of get_kid is either missing or invalid.
from azure.storage.blob import BlobServiceClient
from azure.identity import DefaultAzureCredential, DeviceCodeCredential
from azure.keyvault.keys import KeyClient
from azure.keyvault.keys.crypto import CryptographyClient
from azure.keyvault.keys.crypto import KeyWrapAlgorithm
from azure.core.exceptions import ResourceNotFoundError
credential = DeviceCodeCredential(additionally_allowed_tenants =["<tenant id>"])
def configure_blob_service_client(key_id, cred):
storage_account_name = "<storage_name>"
storage_account_uri = f"https://{storage_account_name}.blob.core.windows.net"
encryption_options = {
#"require_encryption": True,
"key_encryption_key": CryptographyClient(key_id, credential),#ExtendedCryptographyClient(key.id , credential),
"encryption_version": '2.0',
"key_wrap_algorithm": KeyWrapAlgorithm.rsa_oaep
}
blob_service_client = BlobServiceClient(account_url=storage_account_uri, credential=cred, **encryption_options)
return blob_service_client
key_vault_name = f"<key_vault_name>"
key_vault_uri = f"https://{key_vault_name}.vault.azure.net"
key_client = KeyClient(vault_url=key_vault_uri, credential=credential)
key_name = "<key_name>/<key_version>"
key = key_client.get_key(key_name)
display(key.id)
blob_service_client = configure_blob_service_client(key.id, credential)
blob_client = blob_service_client.get_blob_client(container="<container_name>", blob="<blob_name>")
downloaded = blob_client.download_blob()
blob_data = downloaded.readall()
blob_data
所以SDK期望密钥加密密钥有一个
get_kid()
方法(SDK源代码),但是SDK的CryptographyClient没有这样的方法。
尝试自己实现 get_kid() 方法,这会导致其他问题,所以可能不是正确的方法
解决方案只是扩展
CryptoGraphy
客户端...
class ExtendedCryptographyClient:
def __init__(self, key_id, credential):
self.cryptography_client = CryptographyClient(key_id, credential)
self.key_id = key_id
def wrap_key(self, key: bytes) -> bytes:
# Implement wrapping logic using cryptography_client
wrap_result = self.cryptography_client.wrap_key(KeyWrapAlgorithm.rsa_oaep, key)
return wrap_result.encrypted_key
def unwrap_key(self, key: bytes, algorithm: str) -> bytes:
# Implement unwrapping logic using cryptography_client
unwrap_result = self.cryptography_client.unwrap_key(algorithm, key)
return unwrap_result.key
def get_kid(self) -> str:
# Return the key ID
return self.key_id
def get_key_wrap_algorithm(self) -> str:
# Return the key wrap algorithm used
return KeyWrapAlgorithm.rsa_oaep