[在python3中使用parsedmarc创建dmarc解析器以用于AWS s3

问题描述 投票:0回答:1

我对编程非常陌生。我正在研究用于手动发送到s3存储桶中的DMARC报告文件的管道,以分析发送到我的电子邮件帐户的DMARC报告文件。此任务的目标是使用parsedmarc下载,提取和分析文件:https://github.com/domainaware/parsedmarc我遇到的困难是,如果目标文件不是.zip文件,则设置条件语句以提取.gz文件。我假设gzip库足以满足此目的。这是我到目前为止的代码。我正在使用适用于AWS的python3和boto3库。任何帮助表示赞赏!

import parsedmarc    
import pprint
import json
import boto3
import zipfile
import gzip

pp = pprint.PrettyPrinter(indent=2)

def main():
    #Set default session profile and region for sandbox account. Access keys are pulled from /.aws/config and /.aws/credentials.
    #The 'profile_name' value comes from the header for the account in question in /.aws/config and /.aws/credentials
    boto3.setup_default_session(region_name="aws-region-goes-here")
    boto3.setup_default_session(profile_name="aws-account-profile-name-goes-here")

    #Define the s3 resource, the bucket name, and the file to download. It's hardcoded for now...
    s3_resource = boto3.resource(s3)
    s3_resource.Bucket('dmarc-parsing').download_file('source-dmarc-report-filename.zip' '/home/user/dmarc/parseme.zip')

    #Use the zipfile python library to extract the file into its raw state.
    with zipfile.ZipFile('/home/user/dmarc/parseme.zip', 'r') as zip_ref:
        zip_ref.extractall('/home/user/dmarc')

    #Ingest all locations for xml file source
    dmarc_report_directory = '/home/user/dmarc/'
    dmarc_report_file = 'parseme.xml'

    """I need an if statement here for extracting .gz files if the file type is not .zip. The contents of every archive are .xml files"""

    #Set report output variables using functions in parsedmarc. Variable set to equal the output
    pd_report_output=parsedmarc.parse_aggregate_report_file(_input=f"{dmarc_report_directory}{dmarc_report_file}")
    #use jsonify to make the output in json format
    pd_report_jsonified = json.loads(json.dumps(pd_report_output))

    dkim_status = pd_report_jsonified['records'][0]['policy_evaluated']['dkim']
    spf_status = pd_report_jsonified['records'][0]['policy_evaluated']['spf']

    if dkim_status == 'fail' or spf_status == 'fail':
        print(f"{dmarc_report_file} reports failure. oh crap. report:")
    else:
        print(f"{dmarc_report_file} passes. great. report:")

    pp.pprint(pd_report_jsonified['records'][0]['auth_results'])


if __name__ == "__main__":
    main()
amazon-web-services amazon-s3 python-3.7
1个回答
0
投票

这里是使用我发现的parsedmarc.parse_aggregate_report_xml方法的代码。希望这有助于其他人解析这些报告:

import parsedmarc
import pprint
import json
import boto3
import zipfile
import gzip

pp = pprint.PrettyPrinter(indent=2)

def main():

    #Set default session profile and region for account. Access keys are pulled from ~/.aws/config and ~/.aws/credentials.
    #The 'profile_name' value comes from the header for the account in question in ~/.aws/config and ~/.aws/credentials
    boto3.setup_default_session(profile_name="aws_profile_name_goes_here", region_name="region_goes_here")

    source_file = 'filename_in_s3_bucket.zip'
    destination_directory = '/tmp/'
    destination_file = 'compressed_report_file'

    #Define the s3 resource, the bucket name, and the file to download. It's hardcoded for now...
    s3_resource = boto3.resource('s3')
    s3_resource.Bucket('bucket-name-for-dmarc-report-files').download_file(source_file, f"{destination_directory}{destination_file}")

    #Extract xml
    outputxml = parsedmarc.extract_xml(f"{destination_directory}{destination_file}")

    #run parse dmarc analysis & convert output to json
    pd_report_output = parsedmarc.parse_aggregate_report_xml(outputxml)
    pd_report_jsonified = json.loads(json.dumps(pd_report_output))

    #loop through results and find relevant status info and pass fail status
    dmarc_report_status = ''
    for record in pd_report_jsonified['records']:
        if False in record['alignment'].values():
            dmarc_report_status = 'Failed'
            #************ add logic for interpreting results

    #if fail, publish to sns
    if dmarc_report_status == 'Failed':

        message = "Your dmarc report failed a least one check. Review the log for details"

        sns_resource = boto3.resource('sns')
        sns_topic = sns_resource.Topic('arn:aws:sns:us-west-2:112896196555:TestDMARC')
        sns_publish_response = sns_topic.publish(Message=message)


if __name__ == "__main__":
    main()
© www.soinside.com 2019 - 2024. All rights reserved.