我正在尝试通过 Azure Databricks Pyspark 解压缩 Azure ADLS Gen2 容器中的文件。当我使用 ZipFile 时,出现
BadZipFile
错误或 FileNotFoundError
。
我可以读取同一文件夹中的 CSV,但不能读取 zip 文件。
zip 文件路径与我从
dbutils.fs.ls(blob_folder_url)
获得的文件路径相同。
代码:
import zipfile, os, io, re
# Azure Blob Storage details
storage_account_name = "<>"
container_name = "<>"
folder_path = "<>"
blob_folder_url = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{folder_path}"
zip_file = blob_folder_url + 'batch1_weekly_catman_20241109.zip'
# List files in the specified blob folder
files = dbutils.fs.ls(blob_folder_url)
for file in files:
# Check if the file is a ZIP file
if file.name.endswith('.zip'):
print(f"Processing ZIP file: {file.name}")
# Read the ZIP file into memory
zip_file_path = file.path
zip_blob_data = dbutils.fs.head(zip_file_path) # Read the ZIP file content
# Unzip the file
with zipfile.ZipFile(io.BytesIO(zip_blob_data.encode('utf-8')), 'r') as z:
print('zipppppppper')
# with zipfile.ZipFile(zip_file, 'r') as z:
# print('zipppppppper')
错误消息:
这是我用来解压缩、写入 csv 和存档压缩文件的代码。
<!-- language:python-->
%pip install azure-storage-blob
dbutils.library.restartPython()
from azure.storage.blob import BlobServiceClient
from io import BytesIO
import tempfile, os, zipfile, re
def unzip_and_upload_to_blob(
source_connection_string,
source_container,
source_zipped_files,
dest_container,
archive_folder_path
):
for zipped_file in source_zipped_files:
# Source blob client setup
source_blob_service = BlobServiceClient.from_connection_string(source_connection_string)
source_container_client = source_blob_service.get_container_client(source_container)
source_blob_client = source_container_client.get_blob_client(zipped_file)
# Destination blob client setup (using same connection string)
dest_container_client = source_blob_service.get_container_client(dest_container)
# Archive Blob Client setup
archive_file_path = archive_folder_path + get_filename(zipped_file)
archive_blob_client = source_container_client.get_blob_client(archive_file_path)
# Create destination path with .csv extension
dest_path = os.path.splitext(zipped_file)[0] + '.csv'
# Download and process zip file in memory
print(f"Downloading zip file from: {zipped_file}")
blob_data = source_blob_client.download_blob()
zip_bytes = blob_data.readall()
zip_buffer = BytesIO(zip_bytes)
# Create a temporary directory for extracted files
with tempfile.TemporaryDirectory() as temp_dir:
# Extract files to temporary directory
print("Extracting zip file...")
with zipfile.ZipFile(zip_buffer) as zip_ref:
zip_ref.extractall(temp_dir)
# Get list of files in temp directory
extracted_files = []
for root, dirs, files in os.walk(temp_dir):
for file in files:
if file.endswith('.csv'): # Only process CSV files
local_file_path = os.path.join(root, file)
extracted_files.append(local_file_path)
if not extracted_files:
raise Exception("No CSV files found in the zip archive")
# Upload the CSV file to destination
if len(extracted_files) == 1:
# If there's only one CSV file, upload it with the destination name
with open(extracted_files[0], 'rb') as data:
print(f"Uploading to: {dest_path}")
dest_container_client.upload_blob(
name=dest_path,
data=data,
overwrite=True
)
print(f"Successfully uploaded to: {dest_path}")
# Archive the zipped blob
# Move the blob
try:
# Copy the blob to the new location in the archive folder
copy_status = archive_blob_client.start_copy_from_url(source_blob_client.url)
# Wait for the copy to complete
while copy_status.status == "pending":
copy_status = archive_blob_client.get_blob_properties().copy
if copy_status.status == "success":
# Delete the original blob
source_blob_client.delete_blob()
print(f"Blob '{zipped_file}' moved to '{archive_file_path}' successfully.")
else:
print(f"Failed to copy blob: {copy_status.status}")
except Exception as e:
print(f"An error occurred while moving the blob: {e}")
else:
# If there are multiple CSV files, raise an exception
raise Exception(f"Found multiple CSV files in zip archive: {len(extracted_files)}. Expected only one.")
这是运行该函数的代码:
# unzip the files and upload unzipped files to blob, then delete the zipped files
unzip_and_upload_to_blob(
source_connection_string=connection_string,
source_container=source_container,
source_zipped_files=zipped_files,
dest_container=dest_container,
archive_folder_path=archive_folder_path
)
这是获取压缩文件列表的代码:
def list_zip_files(connection_string, source_container, source_blob_prefix):
# Create the BlobServiceClient object
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
# Get the container client
container_client = blob_service_client.get_container_client(source_container)
zipped_files = []
# List blobs in the container
print(f"Listing .zip blobs in container '{source_container}':")
try:
blob_list = container_client.list_blobs(name_starts_with=source_blob_prefix)
for blob in blob_list:
if ".zip" in blob.name:
zipped_files.append(blob.name)
print(f"Blob name: {blob.name}, Blob size: {blob.size} bytes")
except Exception as e:
print(f"An error occurred: {e}")
return zipped_files
这是运行该函数的代码: 连接字符串存储在天蓝色的密钥保管库中。
# Azure Blob Storage details
connection_string = dbutils.secrets.get(scope='KEY_VAULT_NAME', key='KEY_NAME')
storage_account_name = "STORAGE_ACCOUNT_NAME"
source_container = "SOURCE_CONTAINER_NAME"
source_blob_prefix = 'BLOB_PREFIX'
dest_container = "DESTINATION_CONTAINER_NAME"
folder_path = "SOURCE_FILE_PATH"
archive_folder_path = "ARCHIVE_FILE_PATH"
# get a list of zipped files in the weekly_sales folder:
zipped_files = list_zip_files(connection_string, source_container, source_blob_prefix)