这就是我们如何在Azure Blob存储上创建PDF并从Azure Blob Storage中读取文件
import logging
import os
from azure.storage.blob import ContainerClient
from starlette.config import Config
from datetime import timedelta
from datetime import datetime
import pytz
class azure_storage_account:
def __init__(self, azure_storage_connection_string="",container_name =""):
dir_path = os.getcwd()
config = Config(dir_path + os.sep + '.env')
if azure_storage_connection_string == "":
self.azure_storage_connection_string = config('AZURE_STORAGE_CONNECTION_STRING')
else:
self.azure_storage_connection_string = azure_storage_connection_string
if container_name == "":
self.container_name = config('AZURE_API_FILE_CONTAINER')
else:
self.container_name = container_name
self.container_client = ContainerClient.from_connection_string(conn_str=self.azure_storage_connection_string,container_name=self.container_name)
def create_file(self,file_path,data=""):
try:
blob_client = self.container_client.get_blob_client(file_path)
blob_client.upload_blob(data,overwrite=True)
return True
except Exception as e:
logging.error(e)
return False
def read_file(self,file_path):
try:
file_stream = self.container_client.download_blob(file_path)
file_content = file_stream.readall()
return file_content
except Exception as e:
logging.error(e)
return False
upload_blob和download_blob需要2秒钟的执行,这增加了我们的API
的执行时间 我尝试了异步,但没有缩短时间。 Func工具缓存也没有帮助对于Redis缓存,我们需要一个新的服务器
2‑second
延迟可能是当前网络和存储配置的往返时间的固有部分。我已将
max_concurrency
用于给定代码,以用于更快的多线程上传和下载任务。main.py:
import time
from storage import AzureStorage
storage = AzureStorage()
local_file_path = "C:/KamSTO/xxxxxxxx/basic-text.pdf"
blob_file_path = "uploaded-file.pdf"
with open(local_file_path, "rb") as file:
file_data = file.read()
start_time = time.time()
upload_status = storage.upload_file(blob_file_path, file_data)
upload_time = time.time() - start_time
print(f"Upload Success: {upload_status}, Time Taken: {upload_time:.2f} seconds")
start_time = time.time()
downloaded_data = storage.download_file(blob_file_path)
download_time = time.time() - start_time
if downloaded_data:
with open("downloaded-file.pdf", "wb") as file:
file.write(downloaded_data)
print(f"Downloaded File Saved as: downloaded-file.pdf, Time Taken: {download_time:.2f} seconds")
else:
print("Failed to download file")
BlobServiceClient
import logging
from azure.storage.blob import BlobServiceClient
from config import AZURE_STORAGE_CONNECTION_STRING, AZURE_STORAGE_CONTAINER_NAME
class AzureStorage:
def __init__(self):
try:
self.blob_service_client = BlobServiceClient.from_connection_string(AZURE_STORAGE_CONNECTION_STRING)
self.container_client = self.blob_service_client.get_container_client(AZURE_STORAGE_CONTAINER_NAME)
except Exception as e:
logging.error(f"Failed to connect to Azure Blob Storage: {e}")
def upload_file(self, file_path, data):
"""Upload a file to Azure Blob Storage."""
try:
blob_client = self.container_client.get_blob_client(file_path)
blob_client.upload_blob(data, overwrite=True, max_concurrency=5)
return True
except Exception as e:
logging.error(f"Upload failed: {e}")
return False
def download_file(self, file_path):
"""Download a file from Azure Blob Storage."""
try:
blob_client = self.container_client.get_blob_client(file_path)
file_stream = blob_client.download_blob(max_concurrency=5)
file_content = file_stream.readall()
return file_content
except Exception as e:
logging.error(f"Download failed: {e}")
return False
Config.py:
import os
from dotenv import load_dotenv
load_dotenv()
AZURE_STORAGE_CONNECTION_STRING = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
AZURE_STORAGE_CONTAINER_NAME = os.getenv("AZURE_STORAGE_CONTAINER_NAME")
.env:
AZURE_STORAGE_CONNECTION_STRING=<storageConneString>
AZURE_STORAGE_CONTAINER_NAME=<containerName>
输出:
您可以看到该任务的总运行时是1.73 sec
。
Dowloaded
在本地项目路径中:
Blob uploaded
到Azure Blob存储。