我是 AWS 的初学者,希望获得一些关于我们正在尝试实施的以下设计的建议。
我们正在通过 API Gateway 从 AWS Lamda 的输出响应中读取并获取来自 S3 的批量 json 文件的内容。
在 Lamda 内部,我们有一个 python 代码,它从 S3 读取文件并将 json 文件的内容合并到一个数组中,并在响应中获得相同的内容。
之后,外部应用程序通过 API 网关接收响应。
下面是相同的Python代码:-
import boto3
import json
import zipfile
from datetime import datetime as dt
client = boto3.resource('s3')
s3_client = boto3.client('s3')
bucket = ‘testlocalS3’
def lambda_handler(event, context):
paginator = s3_client.get_paginator('list_objects')
page_iterator = paginator.paginate(Bucket=bucket,Prefix=‘Test/‘)
isDeleted =event['isDeleted']
column1 =event[‘column1’]
publishDate =dt.fromisoformat(event['publishDate'])
file_list = []
for page in page_iterator:
for Contents in page['Contents']:
file_list.append(Contents['Key'])
#print(file_list)
result=[]
for key in file_list:
head_response = s3_client.head_object(Bucket=bucket, Key=key)
FileLastModifiedDate=dt.fromisoformat(str(head_response['LastModified'].replace(tzinfo=None)))
if (FileLastModifiedDate>=publishDate):
data = s3_client.get_object(Bucket=bucket, Key=key)
content = data['Body'].read().decode("utf-8") # Read and decode content once
# Check if the file is empty
if not content:
print(f"Skipping empty file: {key}")
continue
json_response = json.loads(content)
print(json_response)
if (str(json_response['isDeleted'])==isDeleted and str(json_response['column1']) in column1):
for item in json_response['column1']['column2’] :
if (item[‘column3’]!=‘SomeData'):
result.append(json_response)
return (result)
现在性能非常慢,当 S3 中有大量文件(例如 1000 个 json 文件)时,我们很难获得响应。
您能否就如何增强性能提出建议,这似乎是此设计的障碍?
当前的设计存在一些导致性能下降的瓶颈,特别是在处理大量文件时。以下是一些可提高 AWS Lambda 函数性能的增强功能:
对
list_objects
、head_object
和 get_object
的每次调用都涉及网络请求,当重复处理大量文件时,网络请求可能会很慢。减少 API 调用次数:
Amazon S3 Select查询 JSON 文件的特定部分,而不是使用
get_object
逐一获取单个文件。concurrent.futures
并行获取文件内容。 AWS Lambda 允许并发,因此您可以同时处理多个文件。import concurrent.futures
def fetch_file_content(key):
data = s3_client.get_object(Bucket=bucket, Key=key)
content = data['Body'].read().decode("utf-8")
return content
with concurrent.futures.ThreadPoolExecutor() as executor:
file_contents = list(executor.map(fetch_file_content, file_list))
在
list_objects
阶段过滤文件,以避免不必要的get_object
调用。使用 S3 对象标签或前缀来缩小相关文件的范围:
isDeleted
、column1
),则可以通过 Amazon S3 Inventory 或 S3 标记直接使用标签查询文件。ijson
这样的流式解析器来处理内容,而无需将整个文件加载到内存中。import ijson
for item in ijson.items(content, 'column1.column2'):
if item['column3'] != 'SomeData':
result.append(item)
考虑使用 Amazon S3 事件通知 仅针对新文件或修改后的文件触发 Lambda 函数,而不是迭代存储桶中的所有文件。
下面是包含并行处理和早期过滤的代码的优化版本:
import boto3
import json
import concurrent.futures
from datetime import datetime as dt
client = boto3.resource('s3')
s3_client = boto3.client('s3')
bucket = 'testlocalS3'
def fetch_file_content(key):
try:
data = s3_client.get_object(Bucket=bucket, Key=key)
content = data['Body'].read().decode("utf-8")
return content
except Exception as e:
print(f"Error fetching {key}: {str(e)}")
return None
def lambda_handler(event, context):
paginator = s3_client.get_paginator('list_objects')
page_iterator = paginator.paginate(Bucket=bucket, Prefix='Test/')
isDeleted = event['isDeleted']
column1 = event['column1']
publishDate = dt.fromisoformat(event['publishDate'])
file_list = []
# Collect file keys
for page in page_iterator:
for content in page.get('Contents', []):
file_list.append(content['Key'])
result = []
# Fetch file content in parallel
with concurrent.futures.ThreadPoolExecutor() as executor:
file_contents = list(executor.map(fetch_file_content, file_list))
for content in file_contents:
if not content:
continue
try:
json_response = json.loads(content)
FileLastModifiedDate = dt.fromisoformat(
str(json_response.get('LastModified', '')).replace(tzinfo=None)
)
if FileLastModifiedDate >= publishDate:
if str(json_response['isDeleted']) == isDeleted and str(json_response['column1']) in column1:
for item in json_response['column1']['column2']:
if item['column3'] != 'SomeData':
result.append(json_response)
except Exception as e:
print(f"Error processing content: {str(e)}")
continue
return result
这些优化应该会显着提高您的设计性能。