DataFlow 代码返回 DoFn 类的空输出

问题描述 投票:0回答:1

我正在尝试使用数据流运行下面的代码,其中我在类中定义了 2-3 个函数,其中 2 个函数正在工作,但 send_email() 正在工作,也没有抛出任何错误。 请帮忙解决问题

#dataflow pipeline code to run 
import apache_beam as beam
import os
import argparse
import logging
import pandas as  pd
import datetime
import pytz
from oauth2client.client import GoogleCredentials
from datetime import datetime,date,timedelta
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import time
import json
import requests
from google.cloud import pubsub_v1
import google.cloud.dlp_v2
from json import loads
import smtplib
from email.utils import formataddr
#from google.cloud import dlp_v2
from google.protobuf import json_format

class readandwrite(beam.DoFn):

    def send_email(self,content_json):
        smtp_email = "[email protected]"
        smtp_password = "password"
        email_subject_bigquery = "PII Information Detected!"
        email_body_bigquery = f'''
        Dear BigQuery Admin,

        We want to inform you that personally identifiable information (PII) has been detected in a transaction.

        - Duration: {content_json["duration"]} seconds

        - Text: "{content_json["text"]}

        Appropriate actions has been taken by the DLP teammsecure the data.

        Thank you,
        DataLake DLP Admin Team,
        sush Ltd.
        '''
        try:
            with smtplib.SMTP("smtp.gmail.com", 587) as server:
                server.starttls()
                server.login(smtp_email, smtp_password)
                sender_name = "DLP DataLake Admin Team"
                sender_address = "[email protected]"
                formatted_sender = formataddr((sender_name, sender_address))
                email_message_bigquery = f"From: {formatted_sender}\nSubject: {email_subject_bigquery}\nTo: [email protected]\n\n{email_body_bigquery}"
                server.sendmail(smtp_email, "[email protected]", email_message_bigquery)
                
            print("Emails sent successfully.")

        except Exception as e:

            print("Error sending emails:", str(e))


    def deidentify_content_with_dlp(self,content_json):
        import google.cloud.dlp_v2
        # Existing code for DLP de-identification
        dlp_client = google.cloud.dlp_v2.DlpServiceClient()
        item = {"value": json.dumps(content_json)}
        credit_card_info_type = {"name": "CREDIT_CARD_NUMBER"}
        phone_number_info_type = {"name": "PHONE_NUMBER"}

        # The transformation configuration to de-identify the content.
        deidentify_config = {
            "info_type_transformations": {
                "transformations": [
                    {
                        "info_types": [credit_card_info_type],
                        "primitive_transformation": {
                            "character_mask_config": {
                                "masking_character": "#",
                                "number_to_mask": 7,
                                "reverse_order": True,
                            }
                        },
                    },
                    {
                        "info_types": [phone_number_info_type],
                        "primitive_transformation": {
                            "character_mask_config": {
                                "masking_character": "#",
                                "number_to_mask": 7,
                                "reverse_order": True,
                            }
                        }
                    },
                ]
            }
        }

        # Convert the project ID into a full resource ID.
        project_id = "sandeepdev"
        parent = f"projects/{project_id}"

        # Call the API to inspect and de-identify the content.
        try:
            response = dlp_client.deidentify_content(
                request={
                    "parent": parent,
                    "deidentify_config": deidentify_config,
                    "item": item,
                }
            )

            # Check if PII information was found by DLP
            logging.info("Applying DLP de-identification...")
            if response.item.value and response.overview.transformation_summaries:
                print("PII information found. Creating ServiceNow incident.")
                self.send_email(content_json)
                print("Email send to BigQuery Admin")

            return json.loads(response.item.value) if response.item.value else content_json
        except Exception as e:
            print(f"Error: {e}")
            print("Error during de-identification. Inserting original content to BigQuery.")
            return content_json  # In case of an error, insert the original content to BigQuery
            logging.info("DLP de-identification completed...")

    def process(self, conetxt):
        import time
        import json
        import requests
        from google.cloud import pubsub_v1
        import google.cloud.dlp_v2
        from json import loads
        import smtplib
        from email.utils import formataddr
        from google.cloud import bigquery
        project_id = "sandeepdev"
        subscription_id = "audio_msg-sub"
        client_bigquery = bigquery.Client()
        subscriber = pubsub_v1.SubscriberClient()
        subscription_path = subscriber.subscription_path(project_id, subscription_id)
        masked_table_id = "sandeepdev.call_interaction.cleansed_dlp_raw_table"
        raw_table_id="sandeepdev.call_interaction.raw_audio_data"
        dlp_count_table_id="sandeepdev.call_interaction.dlp_count"
        max_messages = 1
        count=0
        rows_to_insert_raw = []  # Clear the list after inserting the rows
        rows_to_insert_masked = []
        rows_to_insert_dlp_count=[]
        logging.info("Starting data processing workflow...")
        while True:
            response = subscriber.pull(request={"subscription": subscription_path, "max_messages": max_messages})

            for received_message in response.received_messages:
                message = received_message.message
                content_json = json.loads(message.data.decode('utf-8'))
                masked_data = self.deidentify_content_with_dlp(content_json)
                logging.info(masked_data)
                audio_file_name=masked_data['audio_file_name']
                print("masked data is :" , masked_data)
                dlp_list= masked_data['text'].split(" ")
                for value in dlp_list:
                    if '#' in value:
                        count += 1
                print("masked data count:", count)
                insert_data = {
                    "audio_file_name": audio_file_name,
                    "PII_count": count
                }
                rows_to_insert_dlp_count.append(insert_data)
                load_PII_count = client_bigquery.insert_rows_json(dlp_count_table_id, rows_to_insert_dlp_count)
                count=0
                ##extract audio file name and the count value and insert to another bigquery table
                subscriber.acknowledge(request={"subscription": subscription_path, "ack_ids": [received_message.ack_id]})
                #inserts original data to a different bigquery table
                rows_to_insert_raw.append(content_json)
                load_raw = client_bigquery.insert_rows_json(raw_table_id, rows_to_insert_raw)

                # Insert the masked data into dlp_masked_data table
                rows_to_insert_masked.append(masked_data)
                load_masked = client_bigquery.insert_rows_json(masked_table_id, rows_to_insert_masked)

                rows_to_insert_raw = []  # Clear the list after inserting the rows
                rows_to_insert_masked = []  # Clear the list after inserting the rows
                rows_to_insert_dlp_count=[]
                logging.info("Data processing workflow completed.")


            time.sleep(2)
    

def run():    
    try: 
        parser = argparse.ArgumentParser()
        parser.add_argument(
            '--dfBucket',
            required=True,
            help= ('Bucket where JARS/JDK is present')
            )
        known_args, pipeline_args = parser.parse_known_args()
        global df_Bucket 
        df_Bucket = known_args.dfBucket
        logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
        pipeline_options = PipelineOptions(pipeline_args)
        pipeline_options.view_as(StandardOptions).streaming = True
        pcoll = beam.Pipeline(options=pipeline_options)
        logging.info("Pipeline Starts")
        dummy= pcoll | 'Initializing..' >> beam.Create(['1'])
        dummy_env = dummy | 'Processing' >>  beam.ParDo(readandwrite())
        p=pcoll.run()
        logging.info('Job Run Successfully!')
        p.wait_until_finish()
    except Exception as e:
        logging.exception('Failed to launch datapipeline')
        logging.exception('Failed to launch datapipeline: %s', str(e))
        raise    
if __name__ == '__main__':
    run()

在所提供的情况下,发生数据脱敏,数据已加载到bigQuery,但电子邮件功能无法正常工作。当尝试使用 Directrunner 在本地运行时,它按预期工作,但不使用 dataflow runner

运行命令:

python3 /home/sandeepdev751/dlp_dataflow.py --runner DataflowRunner --project sandeepdev  --region us-central1 --job_name dlpcheckerv5 --temp_location  gs://sandeepdev/temp  --dfBucket "sandeepdev" --setup_file /home/sandeepdev751/setup.py
python-3.x google-cloud-platform google-cloud-dataflow apache-beam python-class
1个回答
0
投票

没有任何日志很难调试,但我的猜测是: 您的数据流服务帐户无权访问 smtp 服务器。

在当前设置中,您永远不会看到任何错误,因为 Dataflow 不会记录任何

print
语句。使用
DirectRunner
时可以看到它们,但使用
DirectRunner
时看不到它们。只需切换到
logging
,您就会开始看到一些问题。

除此之外,我强烈建议您重构代码。有几个问题违背了

Beam
框架。

  1. 正如 @XQHu 所提到的,使用 Beam 原生
    ReadFromPubSub
    源(参见 here)。虽然
    while
    有效,但一旦你想排空管道,你就会遇到很多问题。此外,如果您使用 Dataflow,则原生 PubSub 输入会由 Google 优化。
  2. 一旦使用
    ReadFromPubSub
    ,您就不再需要
    beam.Create
    虚拟输入。此外,您可以跳过
    streaming=True
    选项,因为管道将以 PubSub 作为输入进行本机流式传输。另一方面,你的
    p.wait_until_finish()
    将不再有多大意义。只需搜索 PubSub 摄取管道,那里有很多示例,这并不难,
  3. 将这个巨大的 DoFn 分成几个较小的 DoFn。使以后的调试变得更加容易(不是必须的)。
  4. 最后,出于安全原因,我建议使用 GCP 本机秘密管理器 来存储密码,而不是将其直接写入代码中。请记住,您可能需要为 Dataflow 服务帐户授予角色
    roles/secretmanager.secretAccessor
    (请参阅此处
最新问题
© www.soinside.com 2019 - 2024. All rights reserved.