我有一个 zip 文件,其中包含上传到我的存储帐户 (ADLSv2) 的图像
storage acc: samplesa
container: samplecontainersa
data1: /folder1/sample1.exe
data2: /folder1/sample2.zip
我现在需要阅读此 zip,将所有示例图像提取到我的突触环境中的 Pyspark 数据帧中。 以下是我的代码:
import zipfile
from pyspark.sql.functions import map_zip_with
zip_path = "abfss://[email protected]/folder1/sample2.zip"
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
file_list = zip_ref.namelist()
image_files = [f for f in file_list if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
image_data = [(f, zip_ref.read(f)) for f in image_files]
df = spark.createDataFrame(image_data, ["filename", "image_bytes"])
df.show()
但是,我收到以下错误:
No such file or directory: 'abfss://[email protected]/folder1/sample2.zip'
我可以读取同一目录中的其他 csv/txt 文件,但仅在访问 exe 和 zip 时遇到问题。有什么想法吗?谢谢!
您可以按照以下方法将 .zip 文件中的所有示例图像提取到 Pyspark 数据帧中:
读取 Zip 文件并将其解压到临时文件路径,并使用以下代码验证图像是否上传到该路径:
# Download the zip file
blob_client = blob_service_client.get_blob_client(container=container_name, blob="folder1/sample2.zip")
downloaded_blob = blob_client.download_blob().readall()
# Extract the zip file and upload images to ADLS
with zipfile.ZipFile(BytesIO(downloaded_blob)) as z:
for file_info in z.infolist():
if file_info.filename.endswith(('.jpg', '.png')):
print(f"Extracting and uploading {file_info.filename}")
with z.open(file_info.filename) as file:
# Define the blob client for each file
file_name = os.path.basename(file_info.filename)
blob_client = blob_service_client.get_blob_client(container=container_name, blob=f"folder1/images/{file_name}")
blob_client.upload_blob(file, overwrite=True)
# Verify if images are uploaded
blobs_list = blob_service_client.get_container_client(container_name).list_blobs(name_starts_with="folder1/images/")
for blob in blobs_list:
print(f"Uploaded file: {blob.name}")
使用以下代码将提取的图像读入数据帧:
df_images = spark \
.read \
.format("binaryFile") \
.option("pathGlobFilter", "*.{jpg,png}") \
.load("abfss://[email protected]/folder1/images/")
display(df_images)
您将获得图像作为数据框,如下所示:
以下是完整代码供您参考:
import zipfile
from azure.storage.blob import BlobServiceClient
from io import BytesIO
from azure.storage.blob import BlobClient
import os
# Initialize connection to your ADLSv2
account_name = "<accountName>"
account_key = "<accountKey>"
container_name = "<containerName>"
blob_service_client = BlobServiceClient(account_url=f"https://{account_name}.blob.core.windows.net", credential=account_key)
# Download the zip file
blob_client = blob_service_client.get_blob_client(container=container_name, blob="folder1/sample2.zip")
downloaded_blob = blob_client.download_blob().readall()
# Extract the zip file and upload images to ADLS
with zipfile.ZipFile(BytesIO(downloaded_blob)) as z:
for file_info in z.infolist():
if file_info.filename.endswith(('.jpg', '.png')):
print(f"Extracting and uploading {file_info.filename}")
with z.open(file_info.filename) as file:
# Define the blob client for each file
file_name = os.path.basename(file_info.filename)
blob_client = blob_service_client.get_blob_client(container=container_name, blob=f"folder1/images/{file_name}")
blob_client.upload_blob(file, overwrite=True)
# Verify if images are uploaded
blobs_list = blob_service_client.get_container_client(container_name).list_blobs(name_starts_with="folder1/images/")
for blob in blobs_list:
print(f"Uploaded file: {blob.name}")
# Read the extracted images
df_images = spark \
.read \
.format("binaryFile") \
.option("pathGlobFilter", "*.{jpg,png}") \
.load("abfss://[email protected]/folder1/images/")
display(df_images)