从 S3 获取数据时,Lamba 性能非常慢

问题描述 投票:0回答:1

我是 AWS 的初学者,希望获得一些关于我们正在尝试实施的以下设计的建议。

我们正在通过 API Gateway 从 AWS Lamda 的输出响应中读取并获取来自 S3 的批量 json 文件的内容。

在 Lamda 内部,我们有一个 python 代码,它从 S3 读取文件并将 json 文件的内容合并到一个数组中,并在响应中获得相同的内容。

之后,外部应用程序通过 API 网关接收响应。

下面是相同的Python代码:-

import boto3
import json
import zipfile

from datetime import datetime as dt


client = boto3.resource('s3')
s3_client = boto3.client('s3')
bucket = ‘testlocalS3’

def lambda_handler(event, context):
    paginator = s3_client.get_paginator('list_objects')
    page_iterator = paginator.paginate(Bucket=bucket,Prefix=‘Test/‘)
    isDeleted =event['isDeleted']
    column1 =event[‘column1’]
    publishDate =dt.fromisoformat(event['publishDate'])
    file_list = []

    for page in page_iterator:
        for Contents in page['Contents']:
         file_list.append(Contents['Key'])
    #print(file_list)
    result=[]
    for key in file_list:
     head_response = s3_client.head_object(Bucket=bucket, Key=key)
     FileLastModifiedDate=dt.fromisoformat(str(head_response['LastModified'].replace(tzinfo=None)))
     
     if (FileLastModifiedDate>=publishDate):
         data = s3_client.get_object(Bucket=bucket, Key=key)
         content = data['Body'].read().decode("utf-8")  # Read and decode content once
        # Check if the file is empty
         if not content:
            print(f"Skipping empty file: {key}")
            continue
         
         json_response = json.loads(content)
         print(json_response)
         if (str(json_response['isDeleted'])==isDeleted and str(json_response['column1']) in column1):
             for item in json_response['column1']['column2’] :
                if (item[‘column3’]!=‘SomeData'):
                    result.append(json_response)

    return (result) 

现在性能非常慢,当 S3 中有大量文件(例如 1000 个 json 文件)时,我们很难获得响应。

您能否就如何增强性能提出建议,这似乎是此设计的障碍?

amazon-web-services python-2.7 amazon-s3 aws-lambda aws-api-gateway
1个回答
0
投票

当前的设计存在一些导致性能下降的瓶颈,特别是在处理大量文件时。以下是一些可提高 AWS Lambda 函数性能的增强功能:


1。减少 S3 API 调用

list_objects
head_object
get_object
的每次调用都涉及网络请求,当重复处理大量文件时,网络请求可能会很慢。减少 API 调用次数:

  • 使用批处理进行 S3 操作
    • 如果可以在 S3 端过滤数据,请使用
      Amazon S3 Select
      查询 JSON 文件的特定部分,而不是使用 get_object 逐一获取单个文件。
    • 或者,将较小的 JSON 文件合并到单个文件中或将它们分组到存档中(例如 zip),以最大程度地减少访问的对象数量。

2。并行处理

  • 利用多线程或多处理: 使用Python的
    concurrent.futures
    并行获取文件内容。 AWS Lambda 允许并发,因此您可以同时处理多个文件。
  • 示例:
import concurrent.futures

def fetch_file_content(key):
    data = s3_client.get_object(Bucket=bucket, Key=key)
    content = data['Body'].read().decode("utf-8")
    return content

with concurrent.futures.ThreadPoolExecutor() as executor:
    file_contents = list(executor.map(fetch_file_content, file_list))

3.尽早过滤文件

list_objects
阶段过滤文件,以避免不必要的
get_object
调用。使用 S3 对象标签或前缀来缩小相关文件的范围:

  • 如果您可以基于元数据标记文件(例如,
    isDeleted
    column1
    ),则可以通过 Amazon S3 Inventory 或 S3 标记直接使用标签查询文件。

4。本地缓存数据

  • 对于对 S3 的重复请求,实施缓存机制(例如,使用 Amazon ElasticCache 或 AWS Lambda 临时存储进行临时文件存储)。
  • 或者,将文件批量提取到 Lambda 内的临时目录,以避免冗余网络调用。

5。优化 JSON 解析

  • 如果 JSON 文件很大,请考虑使用像
    ijson
    这样的流式解析器来处理内容,而无需将整个文件加载到内存中。
  • 示例:
import ijson

for item in ijson.items(content, 'column1.column2'):
    if item['column3'] != 'SomeData':
        result.append(item)

6。利用事件驱动架构

考虑使用 Amazon S3 事件通知 仅针对新文件或修改后的文件触发 Lambda 函数,而不是迭代存储桶中的所有文件。


7。内存和超时优化

  • 增加分配给 Lambda 函数的内存。更多内存也会提高 CPU 处理能力。
  • 为大型处理任务设置适当的超时值,以避免提前终止。

8。卸载处理

  • 如果数据集太大,请将处理卸载到外部计算服务,例如AWS BatchAWS Glue或具有更多资源的 EC2 实例。
  • 或者,使用 AWS Step Functions 批量编排文件处理。

增强示例代码

下面是包含并行处理和早期过滤的代码的优化版本:

import boto3
import json
import concurrent.futures
from datetime import datetime as dt

client = boto3.resource('s3')
s3_client = boto3.client('s3')
bucket = 'testlocalS3'

def fetch_file_content(key):
    try:
        data = s3_client.get_object(Bucket=bucket, Key=key)
        content = data['Body'].read().decode("utf-8")
        return content
    except Exception as e:
        print(f"Error fetching {key}: {str(e)}")
        return None

def lambda_handler(event, context):
    paginator = s3_client.get_paginator('list_objects')
    page_iterator = paginator.paginate(Bucket=bucket, Prefix='Test/')
    isDeleted = event['isDeleted']
    column1 = event['column1']
    publishDate = dt.fromisoformat(event['publishDate'])
    file_list = []

    # Collect file keys
    for page in page_iterator:
        for content in page.get('Contents', []):
            file_list.append(content['Key'])

    result = []

    # Fetch file content in parallel
    with concurrent.futures.ThreadPoolExecutor() as executor:
        file_contents = list(executor.map(fetch_file_content, file_list))

    for content in file_contents:
        if not content:
            continue
        
        try:
            json_response = json.loads(content)
            FileLastModifiedDate = dt.fromisoformat(
                str(json_response.get('LastModified', '')).replace(tzinfo=None)
            )

            if FileLastModifiedDate >= publishDate:
                if str(json_response['isDeleted']) == isDeleted and str(json_response['column1']) in column1:
                    for item in json_response['column1']['column2']:
                        if item['column3'] != 'SomeData':
                            result.append(json_response)
        except Exception as e:
            print(f"Error processing content: {str(e)}")
            continue

    return result

其他建议

  1. 基准和分析:使用 AWS X-Ray 跟踪和分析您的 Lambda 函数以识别特定瓶颈。
  2. 批量响应:如果 API 网关响应有效负载大小超出限制,请对响应进行分页或分块以适应约束。
  3. 启用压缩:压缩响应(例如 Gzip)以减少有效负载大小并缩短 API 网关响应时间。

这些优化应该会显着提高您的设计性能。

© www.soinside.com 2019 - 2024. All rights reserved.