我正在尝试使用数据流运行下面的代码,其中我在类中定义了 2-3 个函数,其中 2 个函数正在工作,但 send_email() 正在工作,也没有抛出任何错误。 请帮忙解决问题
#dataflow pipeline code to run
import apache_beam as beam
import os
import argparse
import logging
import pandas as pd
import datetime
import pytz
from oauth2client.client import GoogleCredentials
from datetime import datetime,date,timedelta
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import time
import json
import requests
from google.cloud import pubsub_v1
import google.cloud.dlp_v2
from json import loads
import smtplib
from email.utils import formataddr
#from google.cloud import dlp_v2
from google.protobuf import json_format
class readandwrite(beam.DoFn):
def send_email(self,content_json):
smtp_email = "[email protected]"
smtp_password = "password"
email_subject_bigquery = "PII Information Detected!"
email_body_bigquery = f'''
Dear BigQuery Admin,
We want to inform you that personally identifiable information (PII) has been detected in a transaction.
- Duration: {content_json["duration"]} seconds
- Text: "{content_json["text"]}
Appropriate actions has been taken by the DLP teammsecure the data.
Thank you,
DataLake DLP Admin Team,
sush Ltd.
'''
try:
with smtplib.SMTP("smtp.gmail.com", 587) as server:
server.starttls()
server.login(smtp_email, smtp_password)
sender_name = "DLP DataLake Admin Team"
sender_address = "[email protected]"
formatted_sender = formataddr((sender_name, sender_address))
email_message_bigquery = f"From: {formatted_sender}\nSubject: {email_subject_bigquery}\nTo: [email protected]\n\n{email_body_bigquery}"
server.sendmail(smtp_email, "[email protected]", email_message_bigquery)
print("Emails sent successfully.")
except Exception as e:
print("Error sending emails:", str(e))
def deidentify_content_with_dlp(self,content_json):
import google.cloud.dlp_v2
# Existing code for DLP de-identification
dlp_client = google.cloud.dlp_v2.DlpServiceClient()
item = {"value": json.dumps(content_json)}
credit_card_info_type = {"name": "CREDIT_CARD_NUMBER"}
phone_number_info_type = {"name": "PHONE_NUMBER"}
# The transformation configuration to de-identify the content.
deidentify_config = {
"info_type_transformations": {
"transformations": [
{
"info_types": [credit_card_info_type],
"primitive_transformation": {
"character_mask_config": {
"masking_character": "#",
"number_to_mask": 7,
"reverse_order": True,
}
},
},
{
"info_types": [phone_number_info_type],
"primitive_transformation": {
"character_mask_config": {
"masking_character": "#",
"number_to_mask": 7,
"reverse_order": True,
}
}
},
]
}
}
# Convert the project ID into a full resource ID.
project_id = "sandeepdev"
parent = f"projects/{project_id}"
# Call the API to inspect and de-identify the content.
try:
response = dlp_client.deidentify_content(
request={
"parent": parent,
"deidentify_config": deidentify_config,
"item": item,
}
)
# Check if PII information was found by DLP
logging.info("Applying DLP de-identification...")
if response.item.value and response.overview.transformation_summaries:
print("PII information found. Creating ServiceNow incident.")
self.send_email(content_json)
print("Email send to BigQuery Admin")
return json.loads(response.item.value) if response.item.value else content_json
except Exception as e:
print(f"Error: {e}")
print("Error during de-identification. Inserting original content to BigQuery.")
return content_json # In case of an error, insert the original content to BigQuery
logging.info("DLP de-identification completed...")
def process(self, conetxt):
import time
import json
import requests
from google.cloud import pubsub_v1
import google.cloud.dlp_v2
from json import loads
import smtplib
from email.utils import formataddr
from google.cloud import bigquery
project_id = "sandeepdev"
subscription_id = "audio_msg-sub"
client_bigquery = bigquery.Client()
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
masked_table_id = "sandeepdev.call_interaction.cleansed_dlp_raw_table"
raw_table_id="sandeepdev.call_interaction.raw_audio_data"
dlp_count_table_id="sandeepdev.call_interaction.dlp_count"
max_messages = 1
count=0
rows_to_insert_raw = [] # Clear the list after inserting the rows
rows_to_insert_masked = []
rows_to_insert_dlp_count=[]
logging.info("Starting data processing workflow...")
while True:
response = subscriber.pull(request={"subscription": subscription_path, "max_messages": max_messages})
for received_message in response.received_messages:
message = received_message.message
content_json = json.loads(message.data.decode('utf-8'))
masked_data = self.deidentify_content_with_dlp(content_json)
logging.info(masked_data)
audio_file_name=masked_data['audio_file_name']
print("masked data is :" , masked_data)
dlp_list= masked_data['text'].split(" ")
for value in dlp_list:
if '#' in value:
count += 1
print("masked data count:", count)
insert_data = {
"audio_file_name": audio_file_name,
"PII_count": count
}
rows_to_insert_dlp_count.append(insert_data)
load_PII_count = client_bigquery.insert_rows_json(dlp_count_table_id, rows_to_insert_dlp_count)
count=0
##extract audio file name and the count value and insert to another bigquery table
subscriber.acknowledge(request={"subscription": subscription_path, "ack_ids": [received_message.ack_id]})
#inserts original data to a different bigquery table
rows_to_insert_raw.append(content_json)
load_raw = client_bigquery.insert_rows_json(raw_table_id, rows_to_insert_raw)
# Insert the masked data into dlp_masked_data table
rows_to_insert_masked.append(masked_data)
load_masked = client_bigquery.insert_rows_json(masked_table_id, rows_to_insert_masked)
rows_to_insert_raw = [] # Clear the list after inserting the rows
rows_to_insert_masked = [] # Clear the list after inserting the rows
rows_to_insert_dlp_count=[]
logging.info("Data processing workflow completed.")
time.sleep(2)
def run():
try:
parser = argparse.ArgumentParser()
parser.add_argument(
'--dfBucket',
required=True,
help= ('Bucket where JARS/JDK is present')
)
known_args, pipeline_args = parser.parse_known_args()
global df_Bucket
df_Bucket = known_args.dfBucket
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(StandardOptions).streaming = True
pcoll = beam.Pipeline(options=pipeline_options)
logging.info("Pipeline Starts")
dummy= pcoll | 'Initializing..' >> beam.Create(['1'])
dummy_env = dummy | 'Processing' >> beam.ParDo(readandwrite())
p=pcoll.run()
logging.info('Job Run Successfully!')
p.wait_until_finish()
except Exception as e:
logging.exception('Failed to launch datapipeline')
logging.exception('Failed to launch datapipeline: %s', str(e))
raise
if __name__ == '__main__':
run()
在所提供的情况下,发生数据脱敏,数据已加载到bigQuery,但电子邮件功能无法正常工作。当尝试使用 Directrunner 在本地运行时,它按预期工作,但不使用 dataflow runner。
运行命令:
python3 /home/sandeepdev751/dlp_dataflow.py --runner DataflowRunner --project sandeepdev --region us-central1 --job_name dlpcheckerv5 --temp_location gs://sandeepdev/temp --dfBucket "sandeepdev" --setup_file /home/sandeepdev751/setup.py
没有任何日志很难调试,但我的猜测是: 您的数据流服务帐户无权访问 smtp 服务器。
在当前设置中,您永远不会看到任何错误,因为 Dataflow 不会记录任何
print
语句。使用 DirectRunner
时可以看到它们,但使用 DirectRunner
时看不到它们。只需切换到 logging
,您就会开始看到一些问题。
除此之外,我强烈建议您重构代码。有几个问题违背了
Beam
框架。
ReadFromPubSub
源(参见 here)。虽然 while
有效,但一旦你想排空管道,你就会遇到很多问题。此外,如果您使用 Dataflow,则原生 PubSub 输入会由 Google 优化。ReadFromPubSub
,您就不再需要 beam.Create
虚拟输入。此外,您可以跳过 streaming=True
选项,因为管道将以 PubSub 作为输入进行本机流式传输。另一方面,你的p.wait_until_finish()
将不再有多大意义。只需搜索 PubSub 摄取管道,那里有很多示例,这并不难,roles/secretmanager.secretAccessor
(请参阅此处)