无法将 zip/exe 从 ADLSv2 访问到突触

问题描述 投票:0回答:1

我有一个 zip 文件,其中包含上传到我的存储帐户 (ADLSv2) 的图像

storage acc: samplesa
container: samplecontainersa
data1: /folder1/sample1.exe
data2: /folder1/sample2.zip

我现在需要阅读此 zip,将所有示例图像提取到我的突触环境中的 Pyspark 数据帧中。 以下是我的代码:

import zipfile
from pyspark.sql.functions import map_zip_with
zip_path = "abfss://[email protected]/folder1/sample2.zip"
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    file_list = zip_ref.namelist()
image_files = [f for f in file_list if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
image_data = [(f, zip_ref.read(f)) for f in image_files]
df = spark.createDataFrame(image_data, ["filename", "image_bytes"])
df.show()

但是,我收到以下错误:

No such file or directory: 'abfss://[email protected]/folder1/sample2.zip'

我可以读取同一目录中的其他 csv/txt 文件,但仅在访问 exe 和 zip 时遇到问题。有什么想法吗?谢谢!

pyspark zip exe azure-synapse azure-data-lake-gen2
1个回答
0
投票

您可以按照以下方法将 .zip 文件中的所有示例图像提取到 Pyspark 数据帧中:

读取 Zip 文件并将其解压到临时文件路径,并使用以下代码验证图像是否上传到该路径:

  # Download the zip file
blob_client = blob_service_client.get_blob_client(container=container_name, blob="folder1/sample2.zip")
downloaded_blob = blob_client.download_blob().readall()

# Extract the zip file and upload images to ADLS
with zipfile.ZipFile(BytesIO(downloaded_blob)) as z:
    for file_info in z.infolist():
        if file_info.filename.endswith(('.jpg', '.png')):
            print(f"Extracting and uploading {file_info.filename}")
            with z.open(file_info.filename) as file:
                # Define the blob client for each file
                file_name = os.path.basename(file_info.filename)
                blob_client = blob_service_client.get_blob_client(container=container_name, blob=f"folder1/images/{file_name}")
                blob_client.upload_blob(file, overwrite=True)

# Verify if images are uploaded
blobs_list = blob_service_client.get_container_client(container_name).list_blobs(name_starts_with="folder1/images/")
for blob in blobs_list:
    print(f"Uploaded file: {blob.name}")

使用以下代码将提取的图像读入数据帧:

df_images = spark \
    .read \
    .format("binaryFile") \
    .option("pathGlobFilter", "*.{jpg,png}") \
    .load("abfss://[email protected]/folder1/images/")
display(df_images)

您将获得图像作为数据框,如下所示:

enter image description here

以下是完整代码供您参考:

import zipfile
from azure.storage.blob import BlobServiceClient
from io import BytesIO
from azure.storage.blob import BlobClient
import os

# Initialize connection to your ADLSv2
account_name = "<accountName>"
account_key = "<accountKey>"
container_name = "<containerName>"
blob_service_client = BlobServiceClient(account_url=f"https://{account_name}.blob.core.windows.net", credential=account_key)

# Download the zip file
blob_client = blob_service_client.get_blob_client(container=container_name, blob="folder1/sample2.zip")
downloaded_blob = blob_client.download_blob().readall()

# Extract the zip file and upload images to ADLS
with zipfile.ZipFile(BytesIO(downloaded_blob)) as z:
    for file_info in z.infolist():
        if file_info.filename.endswith(('.jpg', '.png')):
            print(f"Extracting and uploading {file_info.filename}")
            with z.open(file_info.filename) as file:
                # Define the blob client for each file
                file_name = os.path.basename(file_info.filename)
                blob_client = blob_service_client.get_blob_client(container=container_name, blob=f"folder1/images/{file_name}")
                blob_client.upload_blob(file, overwrite=True)

# Verify if images are uploaded
blobs_list = blob_service_client.get_container_client(container_name).list_blobs(name_starts_with="folder1/images/")
for blob in blobs_list:
    print(f"Uploaded file: {blob.name}")

# Read the extracted images
df_images = spark \
    .read \
    .format("binaryFile") \
    .option("pathGlobFilter", "*.{jpg,png}") \
    .load("abfss://[email protected]/folder1/images/")

display(df_images)
© www.soinside.com 2019 - 2024. All rights reserved.