我正在创建一个 Lambda 来根据 XSD 验证 XML 消息。我的所有 python 代码都在 AWS 上正常工作,但是由于文件可能非常大(大约 2GB),我需要找到一种方法将其分块并根据 XSD 而不是整个文件验证每条记录。
我已经设法将其按记录分块,并在处理具有几个重复“dataRecord”元素的文件时在AWS上正常工作,但是在从存储桶中读取大文件时,我得到“errorType”: “内存错误”。
这是 XML 的示例,其中“dataRecord”是我需要根据 XSD 进行验证的重复结构:
<root>
<header>
<fieldA>4ABC</fieldA>
</header>
<body>
<dataRecordList>
<dataRecord>
<recordHeader>
<type>ABC</type>
</recordHeader>
</dataRecord>
<dataRecord>
<recordHeader>
<type>ABC</type>
</recordHeader>
</dataRecord>
</dataRecordList>
</body>
<trailer>
<records>1000</totalRecords>
</trailer>
</root>
这是 Lambda 中出现“MemoryError”错误的代码:
xml_file = s3.get_object(Bucket="BucketName", Key="FileName")['Body'].read().decode('utf-8')
有没有办法从文件中检索记录而不尝试将整个文件放入内存?
(免责声明:Python 新手 :-D )
使用 iterparse 函数,我能够处理 S3 存储桶中的大文件,而不会消耗太多内存。我处理了一个 3.4GB 的 XML 文件,而 Lambda 仅使用了 248MB。
这是代码,其中架构是完整消息,其中包括标头和尾部元素。因此,在根据 XSD 验证记录之前,将这些添加到每个重复数据记录中。
import json
from lxml import etree
import datetime
import boto3
s3 = boto3.client('s3')
def lambda_handler(event, context):
bucket = 'my.bucket'
prefix = ""
xml_file_name = 'SOME_XML_FILE.xml'
xsd_file_name = 'SOME_SCHEMA.xsd'
valid_records = 0
invalid_records = 0
start = datetime.datetime.now()
print("Start Time:", start)
try:
xml_root_open = "<root>"
xml_root_close = "</root>"
xml_header = "<header></header>"
xml_drl_open = "<body><dataRecordList>"
xml_drl_close = "</dataRecordList></body>"
xml_trailer = "<trailer><totalRecords>0</totalRecords></trailer>"
xml_top = xml_root_open + xml_header + xml_drl_open
xml_bottom = xml_drl_close + xml_trailer + xml_root_close
print("Get XML File")
xml_file = s3.get_object(Bucket=bucket, Key=xml_file_name)['Body']
print("Get XSD File")
xsd_content = s3.get_object(Bucket=bucket, Key=xsd_file_name)['Body'].read().decode('utf-8')
print("Parse Schema")
xsd_doc = etree.fromstring(xsd_content.encode())
xsd = etree.XMLSchema(xsd_doc)
print("About to start loop")
count = 0
for event, elem in etree.iterparse(xml_file):
if event == 'end':
if elem.tag == 'dataRecord':
# print("Found a Data Record!!")
count+=1
dataRecord = etree.tostring(elem, encoding='unicode')
# print(dataRecord)
single_dr = xml_top + dataRecord + xml_bottom
xml_doc = etree.fromstring(single_dr.encode())
# Validating Record against XSD
if not xsd.validate(xml_doc):
invalid_records = invalid_records + 1
# invalid record, print error
print(xsd.error_log)
else:
# Valid Record
valid_records = valid_records + 1
elem.clear()
print("Number of Data Records")
print(count)
except Exception as e:
print(e)
raise e
end = datetime.datetime.now()
print("End Time: ", end)
print("VALID RECORDS: ", str(valid_records))
print("INVALID RECORDS: ", str(invalid_records))
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda! Test')
}