从 S3 存储桶读取大型 XML 文件并使用 XSD 进行验证

问题描述 投票:0回答:1

我正在创建一个 Lambda 来根据 XSD 验证 XML 消息。我的所有 python 代码都在 AWS 上正常工作,但是由于文件可能非常大(大约 2GB),我需要找到一种方法将其分块并根据 XSD 而不是整个文件验证每条记录。

我已经设法将其按记录分块,并在处理具有几个重复“dataRecord”元素的文件时在AWS上正常工作,但是在从存储桶中读取大文件时,我得到“errorType”: “内存错误”。

这是 XML 的示例,其中“dataRecord”是我需要根据 XSD 进行验证的重复结构:

<root>
  <header>
    <fieldA>4ABC</fieldA>
  </header>
  <body>
    <dataRecordList>
      <dataRecord>
        <recordHeader>
          <type>ABC</type>
        </recordHeader>
      </dataRecord>
      <dataRecord>
        <recordHeader>
          <type>ABC</type>
        </recordHeader>
      </dataRecord>
    </dataRecordList>
  </body>
  <trailer>
    <records>1000</totalRecords>
  </trailer>
</root>

这是 Lambda 中出现“MemoryError”错误的代码:

xml_file = s3.get_object(Bucket="BucketName", Key="FileName")['Body'].read().decode('utf-8')

有没有办法从文件中检索记录而不尝试将整个文件放入内存?

(免责声明:Python 新手 :-D )

python aws-lambda large-files
1个回答
0
投票

使用 iterparse 函数,我能够处理 S3 存储桶中的大文件,而不会消耗太多内存。我处理了一个 3.4GB 的 XML 文件,而 Lambda 仅使用了 248MB。

这是代码,其中架构是完整消息,其中包括标头和尾部元素。因此,在根据 XSD 验证记录之前,将这些添加到每个重复数据记录中。

import json
from lxml import etree
import datetime
import boto3

s3 = boto3.client('s3') 

def lambda_handler(event, context):


bucket = 'my.bucket'
prefix = ""
xml_file_name = 'SOME_XML_FILE.xml'
xsd_file_name = 'SOME_SCHEMA.xsd'

valid_records = 0
invalid_records = 0

start = datetime.datetime.now()

print("Start Time:", start)

try:
    xml_root_open = "<root>"
    xml_root_close = "</root>"
    xml_header = "<header></header>"
    xml_drl_open = "<body><dataRecordList>"
    xml_drl_close = "</dataRecordList></body>"
    xml_trailer = "<trailer><totalRecords>0</totalRecords></trailer>"
    
    xml_top = xml_root_open + xml_header + xml_drl_open
    xml_bottom = xml_drl_close + xml_trailer + xml_root_close
    
    print("Get XML File")
    xml_file = s3.get_object(Bucket=bucket, Key=xml_file_name)['Body']
    
    print("Get XSD File")
    xsd_content = s3.get_object(Bucket=bucket, Key=xsd_file_name)['Body'].read().decode('utf-8')
    
    print("Parse Schema")
    xsd_doc = etree.fromstring(xsd_content.encode())
    xsd = etree.XMLSchema(xsd_doc)
    
    print("About to start loop")
    
    count = 0
    
    for event, elem in etree.iterparse(xml_file):
        if event == 'end':
            if elem.tag == 'dataRecord':
                # print("Found a Data Record!!")
                count+=1        
                
                dataRecord = etree.tostring(elem, encoding='unicode')
                # print(dataRecord)
                
                single_dr = xml_top + dataRecord + xml_bottom

                xml_doc = etree.fromstring(single_dr.encode())
                
                # Validating Record against XSD
                if not xsd.validate(xml_doc):
                    invalid_records = invalid_records + 1
                    # invalid record, print error
                    print(xsd.error_log)
            
                else:
                    # Valid Record
                    valid_records = valid_records + 1
                    

                elem.clear()
    
    print("Number of Data Records")
    print(count)
    
except Exception as e:
    print(e)
    raise e

end = datetime.datetime.now()

print("End Time: ", end)
print("VALID RECORDS: ", str(valid_records))
print("INVALID RECORDS: ", str(invalid_records))

return {
    'statusCode': 200,
    'body': json.dumps('Hello from Lambda! Test')
}
© www.soinside.com 2019 - 2024. All rights reserved.