我对编程非常陌生。我正在研究用于手动发送到s3存储桶中的DMARC报告文件的管道,以分析发送到我的电子邮件帐户的DMARC报告文件。此任务的目标是使用parsedmarc下载,提取和分析文件:https://github.com/domainaware/parsedmarc我遇到的困难是,如果目标文件不是.zip文件,则设置条件语句以提取.gz文件。我假设gzip
库足以满足此目的。这是我到目前为止的代码。我正在使用适用于AWS的python3和boto3库。任何帮助表示赞赏!
import parsedmarc
import pprint
import json
import boto3
import zipfile
import gzip
pp = pprint.PrettyPrinter(indent=2)
def main():
#Set default session profile and region for sandbox account. Access keys are pulled from /.aws/config and /.aws/credentials.
#The 'profile_name' value comes from the header for the account in question in /.aws/config and /.aws/credentials
boto3.setup_default_session(region_name="aws-region-goes-here")
boto3.setup_default_session(profile_name="aws-account-profile-name-goes-here")
#Define the s3 resource, the bucket name, and the file to download. It's hardcoded for now...
s3_resource = boto3.resource(s3)
s3_resource.Bucket('dmarc-parsing').download_file('source-dmarc-report-filename.zip' '/home/user/dmarc/parseme.zip')
#Use the zipfile python library to extract the file into its raw state.
with zipfile.ZipFile('/home/user/dmarc/parseme.zip', 'r') as zip_ref:
zip_ref.extractall('/home/user/dmarc')
#Ingest all locations for xml file source
dmarc_report_directory = '/home/user/dmarc/'
dmarc_report_file = 'parseme.xml'
"""I need an if statement here for extracting .gz files if the file type is not .zip. The contents of every archive are .xml files"""
#Set report output variables using functions in parsedmarc. Variable set to equal the output
pd_report_output=parsedmarc.parse_aggregate_report_file(_input=f"{dmarc_report_directory}{dmarc_report_file}")
#use jsonify to make the output in json format
pd_report_jsonified = json.loads(json.dumps(pd_report_output))
dkim_status = pd_report_jsonified['records'][0]['policy_evaluated']['dkim']
spf_status = pd_report_jsonified['records'][0]['policy_evaluated']['spf']
if dkim_status == 'fail' or spf_status == 'fail':
print(f"{dmarc_report_file} reports failure. oh crap. report:")
else:
print(f"{dmarc_report_file} passes. great. report:")
pp.pprint(pd_report_jsonified['records'][0]['auth_results'])
if __name__ == "__main__":
main()
这里是使用我发现的parsedmarc.parse_aggregate_report_xml
方法的代码。希望这有助于其他人解析这些报告:
import parsedmarc
import pprint
import json
import boto3
import zipfile
import gzip
pp = pprint.PrettyPrinter(indent=2)
def main():
#Set default session profile and region for account. Access keys are pulled from ~/.aws/config and ~/.aws/credentials.
#The 'profile_name' value comes from the header for the account in question in ~/.aws/config and ~/.aws/credentials
boto3.setup_default_session(profile_name="aws_profile_name_goes_here", region_name="region_goes_here")
source_file = 'filename_in_s3_bucket.zip'
destination_directory = '/tmp/'
destination_file = 'compressed_report_file'
#Define the s3 resource, the bucket name, and the file to download. It's hardcoded for now...
s3_resource = boto3.resource('s3')
s3_resource.Bucket('bucket-name-for-dmarc-report-files').download_file(source_file, f"{destination_directory}{destination_file}")
#Extract xml
outputxml = parsedmarc.extract_xml(f"{destination_directory}{destination_file}")
#run parse dmarc analysis & convert output to json
pd_report_output = parsedmarc.parse_aggregate_report_xml(outputxml)
pd_report_jsonified = json.loads(json.dumps(pd_report_output))
#loop through results and find relevant status info and pass fail status
dmarc_report_status = ''
for record in pd_report_jsonified['records']:
if False in record['alignment'].values():
dmarc_report_status = 'Failed'
#************ add logic for interpreting results
#if fail, publish to sns
if dmarc_report_status == 'Failed':
message = "Your dmarc report failed a least one check. Review the log for details"
sns_resource = boto3.resource('sns')
sns_topic = sns_resource.Topic('arn:aws:sns:us-west-2:112896196555:TestDMARC')
sns_publish_response = sns_topic.publish(Message=message)
if __name__ == "__main__":
main()