从 Databricks 解压 Azure Blob 存储中的文件

问题描述 投票:0回答:1

我正在尝试通过 Azure Databricks Pyspark 解压缩 Azure ADLS Gen2 容器中的文件。当我使用 ZipFile 时,出现

BadZipFile
错误或
FileNotFoundError

我可以读取同一文件夹中的 CSV,但不能读取 zip 文件。

zip 文件路径与我从

dbutils.fs.ls(blob_folder_url)
获得的文件路径相同。

BadZipe文件代码: BadZipFile

文件未找到代码: FileNotFound

读取 CSV 代码: reading csv

代码:

import zipfile, os, io, re

# Azure Blob Storage details
storage_account_name = "<>"
container_name = "<>"
folder_path = "<>"

blob_folder_url = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{folder_path}"
zip_file = blob_folder_url + 'batch1_weekly_catman_20241109.zip'

# List files in the specified blob folder
files = dbutils.fs.ls(blob_folder_url)

for file in files:
    # Check if the file is a ZIP file
    if file.name.endswith('.zip'):
        print(f"Processing ZIP file: {file.name}")

        # Read the ZIP file into memory
        zip_file_path = file.path
        zip_blob_data = dbutils.fs.head(zip_file_path)  # Read the ZIP file content

        # Unzip the file
        with zipfile.ZipFile(io.BytesIO(zip_blob_data.encode('utf-8')), 'r') as z:
            print('zipppppppper')
        # with zipfile.ZipFile(zip_file, 'r') as z:
        #     print('zipppppppper')  

错误消息:

  1. BadZipFile:文件不是 zip 文件
  2. 文件未找到错误:[Errno 2] 否 这样的文件或目录
python azure-blob-storage databricks azure-data-lake-gen2
1个回答
0
投票

这是我用来解压缩、写入 csv 和存档压缩文件的代码。

<!-- language:python-->
%pip install azure-storage-blob

dbutils.library.restartPython()

from azure.storage.blob import BlobServiceClient
from io import BytesIO
import tempfile, os, zipfile, re

def unzip_and_upload_to_blob(
    source_connection_string,
    source_container,
    source_zipped_files,
    dest_container,
    archive_folder_path
):
    
    for zipped_file in source_zipped_files:
        # Source blob client setup
        source_blob_service = BlobServiceClient.from_connection_string(source_connection_string)
        source_container_client = source_blob_service.get_container_client(source_container)
        source_blob_client = source_container_client.get_blob_client(zipped_file)
        
        # Destination blob client setup (using same connection string)
        dest_container_client = source_blob_service.get_container_client(dest_container)
        
        # Archive Blob Client setup
        archive_file_path = archive_folder_path + get_filename(zipped_file)
        archive_blob_client = source_container_client.get_blob_client(archive_file_path)

        
        # Create destination path with .csv extension
        dest_path = os.path.splitext(zipped_file)[0] + '.csv'
        
        # Download and process zip file in memory
        print(f"Downloading zip file from: {zipped_file}")
        blob_data = source_blob_client.download_blob()
        zip_bytes = blob_data.readall()
        zip_buffer = BytesIO(zip_bytes)
        
        # Create a temporary directory for extracted files
        with tempfile.TemporaryDirectory() as temp_dir:
            # Extract files to temporary directory
            print("Extracting zip file...")
            with zipfile.ZipFile(zip_buffer) as zip_ref:
                zip_ref.extractall(temp_dir)
                
            # Get list of files in temp directory
            extracted_files = []
            for root, dirs, files in os.walk(temp_dir):
                for file in files:
                    if file.endswith('.csv'):  # Only process CSV files
                        local_file_path = os.path.join(root, file)
                        extracted_files.append(local_file_path)
            
            if not extracted_files:
                raise Exception("No CSV files found in the zip archive")
            
            # Upload the CSV file to destination
            if len(extracted_files) == 1:
                # If there's only one CSV file, upload it with the destination name
                with open(extracted_files[0], 'rb') as data:
                    print(f"Uploading to: {dest_path}")
                    dest_container_client.upload_blob(
                        name=dest_path,
                        data=data,
                        overwrite=True
                    )
                    print(f"Successfully uploaded to: {dest_path}")

                    # Archive the zipped blob
                    # Move the blob
                    try:
                        # Copy the blob to the new location in the archive folder
                        copy_status = archive_blob_client.start_copy_from_url(source_blob_client.url)

                        # Wait for the copy to complete
                        while copy_status.status == "pending":
                            copy_status = archive_blob_client.get_blob_properties().copy

                        if copy_status.status == "success":
                            # Delete the original blob
                            source_blob_client.delete_blob()
                            print(f"Blob '{zipped_file}' moved to '{archive_file_path}' successfully.")
                        else:
                            print(f"Failed to copy blob: {copy_status.status}")
                    except Exception as e:
                        print(f"An error occurred while moving the blob: {e}")
            else:
                # If there are multiple CSV files, raise an exception
                raise Exception(f"Found multiple CSV files in zip archive: {len(extracted_files)}. Expected only one.")

这是运行该函数的代码:

# unzip the files and upload unzipped files to blob, then delete the zipped files
unzip_and_upload_to_blob(
        source_connection_string=connection_string,
        source_container=source_container,
        source_zipped_files=zipped_files,
        dest_container=dest_container,
        archive_folder_path=archive_folder_path
    )

这是获取压缩文件列表的代码:

def list_zip_files(connection_string, source_container, source_blob_prefix):
    # Create the BlobServiceClient object
    blob_service_client = BlobServiceClient.from_connection_string(connection_string)

    # Get the container client
    container_client = blob_service_client.get_container_client(source_container)

    zipped_files = []
    # List blobs in the container
    print(f"Listing .zip blobs in container '{source_container}':")
    try:
        blob_list = container_client.list_blobs(name_starts_with=source_blob_prefix)
        for blob in blob_list:
            if ".zip" in blob.name:
                zipped_files.append(blob.name)
                print(f"Blob name: {blob.name}, Blob size: {blob.size} bytes")
    except Exception as e:
        print(f"An error occurred: {e}")
    return zipped_files

这是运行该函数的代码: 连接字符串存储在天蓝色的密钥保管库中。

# Azure Blob Storage details
connection_string = dbutils.secrets.get(scope='KEY_VAULT_NAME', key='KEY_NAME')
storage_account_name = "STORAGE_ACCOUNT_NAME"
source_container = "SOURCE_CONTAINER_NAME"
source_blob_prefix = 'BLOB_PREFIX'
dest_container = "DESTINATION_CONTAINER_NAME"
folder_path = "SOURCE_FILE_PATH"
archive_folder_path = "ARCHIVE_FILE_PATH"

# get a list of zipped files in the weekly_sales folder:
zipped_files = list_zip_files(connection_string, source_container, source_blob_prefix)
© www.soinside.com 2019 - 2024. All rights reserved.