在 Bigquery 中写入表时出现 Pyspark 性能问题

问题描述 投票:0回答:1

我是 PySpark 世界的新手,在将数据从数据帧写入 Bigquery 中的表时遇到严重的性能问题。我已经尝试了我读过的所有内容、建议、使用重新分区缓存检查点等。但到目前为止没有任何效果。

在下面的示例中,我展示了仅向目标表写入 50,000 行,大约需要 4 分钟。在正常情况下,这应该在几秒钟内完成。如果我考虑到我必须加载到表中的数据库有超过 1700 万行,那么这次就非常糟糕了。

最有可能的是,因为我是 PySpark 的新手,所以我做错了什么,或者我错误地配置了集群和/或 Spark 会话。如果您能帮我解决这个问题,我将非常感激。下面我将放置代码、配置以及控制台输出。

提前非常感谢您,任何帮助都很有价值!

控制台的输出:

:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-avro_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-2d1f06bd-2136-4034-af8f-166c97149fde;1.0
    confs: [default]
    found org.apache.spark#spark-avro_2.12;3.1.2 in central
    found org.spark-project.spark#unused;1.0.0 in central
downloading https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.1.2/spark-avro_2.12-3.1.2.jar ...
    [SUCCESSFUL ] org.apache.spark#spark-avro_2.12;3.1.2!spark-avro_2.12.jar (26ms)
downloading https://repo1.maven.org/maven2/org/spark-project/spark/unused/1.0.0/unused-1.0.0.jar ...
    [SUCCESSFUL ] org.spark-project.spark#unused;1.0.0!unused.jar (13ms)
:: resolution report :: resolve 1369ms :: artifacts dl 44ms
    :: modules in use:
    org.apache.spark#spark-avro_2.12;3.1.2 from central in [default]
    org.spark-project.spark#unused;1.0.0 from central in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   2   |   2   |   2   |   0   ||   2   |   2   |
    ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-2d1f06bd-2136-4034-af8f-166c97149fde
    confs: [default]
    2 artifacts copied, 0 already retrieved (171kB/7ms)
24/08/22 23:31:59 INFO org.sparkproject.jetty.util.log: Logging initialized @7535ms to org.sparkproject.jetty.util.log.Slf4jLog
24/08/22 23:32:00 INFO org.sparkproject.jetty.server.Server: jetty-9.4.40.v20210413; built: 2021-04-13T20:42:42.668Z; git: b881a572662e1943a14ae12e7e1207989f218b74; jvm 1.8.0_312-b07
24/08/22 23:32:00 INFO org.sparkproject.jetty.server.Server: Started @7654ms
24/08/22 23:32:00 INFO org.sparkproject.jetty.server.AbstractConnector: Started ServerConnector@4f7fd6eb{HTTP/1.1, (http/1.1)}{0.0.0.0:35025}
24/08/22 23:32:00 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at pipelines-dl-prod-cluster-encryption-m/10.128.0.92:8032
24/08/22 23:32:01 INFO org.apache.hadoop.yarn.client.AHSProxy: Connecting to Application History server at pipelines-dl-prod-cluster-encryption-m/10.128.0.92:10200
24/08/22 23:32:01 INFO org.apache.hadoop.conf.Configuration: resource-types.xml not found
24/08/22 23:32:01 INFO org.apache.hadoop.yarn.util.resource.ResourceUtils: Unable to find 'resource-types.xml'.
24/08/22 23:32:03 WARN org.apache.spark.deploy.yarn.Client: Same path resource file:///root/.ivy2/jars/org.apache.spark_spark-avro_2.12-3.1.2.jar added multiple times to distributed cache.
24/08/22 23:32:03 WARN org.apache.spark.deploy.yarn.Client: Same path resource file:///root/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar added multiple times to distributed cache.
24/08/22 23:32:03 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1724369481359_0001
24/08/22 23:32:04 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at pipelines-dl-prod-cluster-encryption-m/10.128.0.92:8030
24/08/22 23:32:06 INFO com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl: Ignoring exception of type GoogleJsonResponseException; verified object already exists with desired state.

SQL Query: 

SELECT
    cust_key_id AS cust_key_id_corp
  , contact_point_txt AS contact_point_txt_corp
  , priority
  , sc_exec_dttm
  , DATE('2024-08-22')-1 AS PARTITIONDATE
FROM `mycompany-datalake-prod.svw_bi_corp_cl_cust_contac_prd_acc_fal_cl_contactability_prod.svw_vw_smartcontact_phone_rfm`
WHERE DATE(sc_exec_dttm) = DATE('2024-08-22')
LIMIT 10000

Columns from corp to retail: ['cust_key_id_corp', 'contact_point_txt_corp']
Aliases from corp to retail: ['cust_key_id_retail', 'contact_point_txt_retail']
Columns to clean first characters: {'cust_key_id_corp': '4'}
Output Table: mycompany-datalake-prod.cl_entities_preprod.btd_fal_cl_smartcontact_phone_rfm_retail
Output Table Columns Order: {'1': 'cust_key_id_corp', '2': 'contact_point_txt_corp', '3': 'cust_key_id_retail', '4': 'contact_point_txt_retail', '5': 'priority', '6': 'sc_exec_dttm', '7': 'PARTITIONDATE'}

----------------------------------------------------------------------------------------
Retail Key: Try to access the secret manager
Obtained default credentials for the project mycompany-datalake-prod

----------------------------------------------------------------------------------------
Retail Key: access OK

----------------------------------------------------------------------------------------
Retail Key: Trying to create the udf

----------------------------------------------------------------------------------------
Retail Key: udf OK

----------------------------------------------------------------------------------------
Corp Key: Try to access the secret manager
Obtained default credentials for the project mycompany-datalake-prod

----------------------------------------------------------------------------------------
Corp Key: access OK

----------------------------------------------------------------------------------------
Corp Key: Trying to create the udf

----------------------------------------------------------------------------------------
Corp Key: udf OK

----------------------------------------------------------------------------------------
Start reading the query

24/08/22 23:32:17 INFO com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation: |Querying table mycompany-datalake-prod.wrk_dataproc_processes._bqc_9180ed867c5a4cc0ab125aed6c815771, parameters sent from Spark:|requiredColumns=[cust_key_id_corp,contact_point_txt_corp,priority,sc_exec_dttm,PARTITIONDATE],|filters=[]
24/08/22 23:32:19 INFO com.google.cloud.bigquery.connector.common.ReadSessionCreator: Read session:{"readSessionName":"projects/mycompany-datalake-prod/locations/us/sessions/CAISDDJmw2NBoCanIaAmpmndzRkNzZKU","readSessionCreationStartTime":"2024-08-22T23:32:17.204Z","readSessionCreationEndTime":"2024-08-22T23:32:19.604Z","readSessionPrepDuration":1218,"readSessionCreationDuration":1182,"readSessionDuration":2400}
24/08/22 23:32:19 INFO com.google.cloud.bigquery.connector.common.ReadSessionCreator: Requested 20000 max partitions, but only received 1 from the BigQuery Storage API for session projects/mycompany-datalake-prod/locations/us/sessions/CAISDDJmw2NBoCanIaAmpmndzRkNzZKU. Notice that the number of streams in actual may be lower than the requested number, depending on the amount parallelism that is reasonable for the table and the maximum amount of parallelism allowed by the system.
24/08/22 23:32:19 INFO com.google.cloud.spark.bigquery.direct.BigQueryRDDFactory: Created read session for table 'mycompany-datalake-prod.wrk_dataproc_processes._bqc_9180ed867c5a4cc0ab125aed6c815771': projects/mycompany-datalake-prod/locations/us/sessions/CAISDDJmw2NBoCanIaAmpmndzRkNzZKU

+------------------------+------------------------+--------+--------------------------+-------------+
|cust_key_id_corp        |contact_point_txt_corp  |priority|sc_exec_dttm              |PARTITIONDATE|
+------------------------+------------------------+--------+--------------------------+-------------+
|uRpSOh/XU5nNv8FTb8PyIQ==|4drYa5U6wKW/m1rJKkEVZA==|1       |2024-08-22T12:35:52.954362|2024-08-21   |
|tbk2mU5xJVhBtlwATHPd+g==|NAWiFY5u+pUxpegY6qQrZw==|1       |2024-08-22T12:35:52.954362|2024-08-21   |
|zINNfvoYV8/oDhgjbooGsA==|G16xHElQz/ATlNP5I3105g==|1       |2024-08-22T12:35:52.954362|2024-08-21   |
|s+l08Eji3Pe8m3mxKIQc6g==|sB/9MfFJTeSfH+ovhpGGJw==|1       |2024-08-22T12:35:52.954362|2024-08-21   |
|rXXoKSqnB1hPo3QWQ+ddzg==|rkoZS+GcXtiXlESGHLkEYw==|1       |2024-08-22T12:35:52.954362|2024-08-21   |
+------------------------+------------------------+--------+--------------------------+-------------+
only showing top 5 rows

24/08/22 23:32:25 INFO com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation: |Querying table mycompany-datalake-prod.wrk_dataproc_processes._bqc_9180ed867c5a4cc0ab125aed6c815771, parameters sent from Spark:|requiredColumns=[cust_key_id_corp,contact_point_txt_corp,priority,sc_exec_dttm,PARTITIONDATE],|filters=[]
24/08/22 23:32:26 INFO com.google.cloud.bigquery.connector.common.ReadSessionCreator: Read session:{"readSessionName":"projects/mycompany-datalake-prod/locations/us/sessions/CAISDHcwMHlKUk5EaTR6ZxoCanIaAmpm","readSessionCreationStartTime":"2024-08-22T23:32:25.768Z","readSessionCreationEndTime":"2024-08-22T23:32:26.125Z","readSessionPrepDuration":89,"readSessionCreationDuration":268,"readSessionDuration":357}
24/08/22 23:32:26 INFO com.google.cloud.bigquery.connector.common.ReadSessionCreator: Requested 20000 max partitions, but only received 1 from the BigQuery Storage API for session projects/mycompany-datalake-prod/locations/us/sessions/CAISDHcwMHlKUk5EaTR6ZxoCanIaAmpm. Notice that the number of streams in actual may be lower than the requested number, depending on the amount parallelism that is reasonable for the table and the maximum amount of parallelism allowed by the system.
24/08/22 23:32:26 INFO com.google.cloud.spark.bigquery.direct.BigQueryRDDFactory: Created read session for table 'mycompany-datalake-prod.wrk_dataproc_processes._bqc_9180ed867c5a4cc0ab125aed6c815771': projects/mycompany-datalake-prod/locations/us/sessions/CAISDHcwMHlKUk5EaTR6ZxoCanIaAmpm

Time to read the query: 19.12794518470764

Start decryption columns corp to retail
Time for decryption columns corp to retail: 0.6136214733123779

Start decryption columns retail to corp
Time for decryption columns retail to corp: 0.22461867332458496

Start column sorting for output table
Time for column sorting for output table: 0.04193377494812012

Total row number: 10000

Start write to output table

24/08/22 23:36:07 INFO com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem: Successfully repaired 'gs://dataproc-datalake-a01350f1-0bef-0bef-0bef-20bf078e6d86/.spark-bigquery-application_1748135243699_0001-fd3aa81e-8bb5--e04b-438c59d9a5c52599/' directory.
24/08/22 23:36:08 INFO com.google.cloud.bigquery.connector.common.BigQueryClient: Submitted job LoadJobConfiguration{type=LOAD, destinationTable=GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=cl_entities_preprod, projectId=mycompany-datalake-prod, tableId=btd_fal_cl_smartcontact_phone_rfm_retail}}, decimalTargetTypes=null, destinationEncryptionConfiguration=null, createDisposition=CREATE_IF_NEEDED, writeDisposition=WRITE_TRUNCATE, formatOptions=FormatOptions{format=PARQUET}, nullMarker=null, maxBadRecords=null, schema=Schema{fields=[Field{name=cust_key_id_corp, type=STRING, mode=null, description=null, policyTags=null, maxLength=null, scale=null, precision=null, defaultValueExpression=null, collation=null}, Field{name=contact_point_txt_corp, type=STRING, mode=null, description=null, policyTags=null, maxLength=null, scale=null, precision=null, defaultValueExpression=null, collation=null}, Field{name=cust_key_id_retail, type=STRING, mode=null, description=null, policyTags=null, maxLength=null, scale=null, precision=null, defaultValueExpression=null, collation=null}, Field{name=contact_point_txt_retail, type=STRING, mode=null, description=null, policyTags=null, maxLength=null, scale=null, precision=null, defaultValueExpression=null, collation=null}, Field{name=priority, type=INTEGER, mode=null, description=null, policyTags=null, maxLength=null, scale=null, precision=null, defaultValueExpression=null, collation=null}, Field{name=sc_exec_dttm, type=STRING, mode=null, description=null, policyTags=null, maxLength=null, scale=null, precision=null, defaultValueExpression=null, collation=null}, Field{name=PARTITIONDATE, type=DATE, mode=null, description=null, policyTags=null, maxLength=null, scale=null, precision=null, defaultValueExpression=null, collation=null}]}, ignoreUnknownValue=null, sourceUris=[gs://dataproc-datalake-a01350f1-0bef-0bef-0bef-20bf078e6d86/.spark-bigquery-application_1748135243699_0001-fd3aa81e-8bb5--e04b-438c59d9a5c52599/part-0000*-21e9f1c5-8887-4b80-8c75-07e99361b532-c000.snappy.parquet], schemaUpdateOptions=null, autodetect=null, timePartitioning=null, clustering=null, useAvroLogicalTypes=null, labels=null, jobTimeoutMs=null, rangePartitioning=null, hivePartitioningOptions=null, referenceFileSchemaUri=null}. jobId: JobId{project=mycompany-datalake-prod, job=6123dc1e-44fa-bb87-8c77-32e2c099ff59, location=US}
24/08/22 23:36:11 INFO com.google.cloud.bigquery.connector.common.BigQueryClient: Done loading to mycompany-datalake-prod.cl_entities_preprod.btd_fal_cl_smartcontact_phone_rfm_retail. jobId: JobId{project=mycompany-datalake-prod, job=6123dc1e-44fa-bb87-8c77-32e2c099ff59, location=US}

Time to write to output table: 222.45286417007446

----------------------------------------------------------------------------------------

Total script execution time: 242.77862739562988
24/08/22 23:36:11 INFO org.sparkproject.jetty.server.AbstractConnector: Stopped Spark@4f7fd6eb{HTTP/1.1, (http/1.1)}{0.0.0.0:0}
apache-spark pyspark google-bigquery google-cloud-dataproc
1个回答
0
投票

首先:我创建一个 Dataproc 集群并从 Airflow 中的 Dag 发送工作

CLUSTER_ENCRIPTION_NAME='pipelines-dl-prod-cluster-encryption'
CLUSTER_ENCRIPTION_SCOPES='cloud-platform' 
CLUSTER_ENCRIPTION_NUM_WORKERS=3
CLUSTER_ENCRIPTION_NUM_SECONDARY_WORKERS=1
CLUSTER_ENCRIPTION_REGION='us-central1'
CLUSTER_ENCRIPTION_ZONE='us-central1-a'
CLUSTER_ENCRIPTION_MASTER_MACHINE_TYPE='e2-standard-8'
CLUSTER_ENCRIPTION_WORKER_MACHINE_TYPE='e2-highcpu-16'
CLUSTER_ENCRIPTION_MASTER_DISK_TYPE='pd-ssd'
CLUSTER_ENCRIPTION_MASTER_DISK_SIZE='100'
CLUSTER_ENCRIPTION_WORKER_DISK_TYPE='pd-ssd'
CLUSTER_ENCRIPTION_SECONDARY_WORKER_DISK_TYPE='pd-ssd'
CLUSTER_ENCRIPTION_WORKER_DISK_SIZE='100'
CLUSTER_ENCRIPTION_SERVICE_ACCOUNT_COMPOSER ='theservice_account@mycompany-datalake-prod.iam.gserviceaccount.com'
CLUSTER_ENCRIPTION_SERVICE_ACCOUNT_PIPELINES='theservice_account@mycompany-datalake-prod.iam.gserviceaccount.com'
CLUSTER_ENCRIPTION_IMAGE_VERSION='2.0.29-debian10' 
CLUSTER_ENCRIPTION_MAX_IDLE='60m'
CLUSTER_ENCRIPTION_PROPERTIES="^#^yarn:yarn.log-aggregation-enable=true,#spark:spark.jars.packages=org.apache.spark:spark-avro_2.12:3.1.2,#dataproc:pip.packages=Unidecode==1.3.7,google-cloud-secret-manager==2.11.0,pbkdf2==1.3,pynacl==1.5.0,pyAesCrypt==6.1.1,pycryptodome==3.20.0,cryptography==41.0.5"
CLUSTER_ENCRIPTION_AUTOSCALING_POLICY='projects/mycompany-datalake-prod/regions/us-central1/autoscalingPolicies/scaling_policy_contact'

   create_dataproc_cluster_encryption = BashOperator(
    task_id=f'create_dataproc_cluster_encryption',
    bash_command="""
        gcloud config set project {project} 
        gcloud config set account {service_account_composer}
        cluster_status=$(gcloud dataproc clusters list \\
                            --project {project} \\
                            --region {region} \\
                            --filter='status.state=ACTIVE AND clusterName={cluster_name}')
        if [ -z "$cluster_status" ]
        then
            echo "Creating cluster {cluster_name} --project {project} --region {region}."
            gcloud beta dataproc clusters create {cluster_name} \\
                --enable-component-gateway \\
                --subnet default \\
                --region {region} \\
                --zone {zone} \\
                --master-machine-type {master_machine_type} \\
                --master-boot-disk-type {master_disk_type} \\
                --master-boot-disk-size {master_disk_size} \\
                --num-workers {num_workers} \\
                --worker-machine-type {worker_machine_type} \\
                --worker-boot-disk-type {worker_disk_type} \\
                --worker-boot-disk-size {worker_disk_size} \\
                --secondary-worker-type 'non-preemptible' \\
                --num-secondary-workers {num_secondary_workers} \\
                --secondary-worker-boot-disk-type {secondary_worker_disk_type} \\
                --secondary-worker-boot-disk-size {worker_disk_size} \\
                --image-version {image_version} \\
                --project {project} \\
                --service-account={service_account_pipelines} \\
                --scopes={scopes} \\
                --properties '{properties}' \\
                --max-idle {max_idle} \\
                --autoscaling-policy '{autoscaling_policy}'
        else
            echo "Cluster {cluster_name} already exists in --project {project} --region {region}."
        fi
        """.format(cluster_name=CLUSTER_ENCRIPTION_NAME, 
                   region=CLUSTER_ENCRIPTION_REGION,       
                   zone=CLUSTER_ENCRIPTION_ZONE,    
                   scopes=CLUSTER_ENCRIPTION_SCOPES,     
                   master_machine_type=CLUSTER_ENCRIPTION_MASTER_MACHINE_TYPE, 
                   master_disk_type=CLUSTER_ENCRIPTION_MASTER_DISK_TYPE, 
                   master_disk_size=CLUSTER_ENCRIPTION_MASTER_DISK_SIZE,
                   num_workers=CLUSTER_ENCRIPTION_NUM_WORKERS,
                   num_secondary_workers=CLUSTER_ENCRIPTION_NUM_SECONDARY_WORKERS,
                   worker_machine_type=CLUSTER_ENCRIPTION_WORKER_MACHINE_TYPE, 
                   worker_disk_type=CLUSTER_ENCRIPTION_WORKER_DISK_TYPE, 
                   worker_disk_size=CLUSTER_ENCRIPTION_WORKER_DISK_SIZE,
                   secondary_worker_disk_type= CLUSTER_ENCRIPTION_SECONDARY_WORKER_DISK_TYPE,
                   image_version=CLUSTER_ENCRIPTION_IMAGE_VERSION,
                   project=PROJECT_ID,
                   service_account_composer=CLUSTER_ENCRIPTION_SERVICE_ACCOUNT_COMPOSER,
                   service_account_pipelines=CLUSTER_ENCRIPTION_SERVICE_ACCOUNT_PIPELINES,
                   properties=CLUSTER_ENCRIPTION_PROPERTIES,
                   autoscaling_policy=CLUSTER_ENCRIPTION_AUTOSCALING_POLICY,
                   max_idle=CLUSTER_ENCRIPTION_MAX_IDLE))

params = {
    "sql_query": read_query(f"{ENTITY_PATH}/btd_fal_co_smartcontact_phone_rfm_retail.sql"),
    "columns_and_alias_from_corp_to_retail": {
        'cust_key_id_corp': 'cust_key_id_retail',
        'contact_point_txt_corp': 'contact_point_txt_retail'
    },
    "columns_and_alias_from_retail_to_corp": {},
    "columns_to_clean_first_characters" : {"cust_key_id_corp": "4"},
    "output_table": f'{DESTINATION_PROJECT_ID}.{DESTINATION_DATASET_ID}.{DESTINATION_TABLE_ID}',
    "output_table_columns_order": {
        1: 'cust_key_id_corp',
        2: 'contact_point_txt_corp',
        3: 'cust_key_id_retail',
        4: 'contact_point_txt_retail',
        5: 'priority',
        6: 'sc_exec_dttm',
        7: 'PARTITIONDATE'
    }
}

job_re_encript_corp_to_retail = DataprocSubmitJobOperator(
        task_id=f'job_re_encript_corp_to_retail',
        job={
            "reference": {"project_id": PROJECT_ID, 
                          "job_id": f"job_{COUNTRY}_{INGESTION_KEY}_{PROCESS_DATE}_{uuid.uuid4()}"},
            "placement": {"cluster_name": CLUSTER_ENCRIPTION_NAME},
            "pyspark_job": {"main_python_file_uri": "gs://dataproc-datalake-a01350f1-0bef-0bef-0bef-20bf078e6d86/dataproc-jobs/encrypt-decrypt/spark-scripts/spark_re_encrypt_corp_retail.py",
                            "args": [json.dumps(params)],
                            "jar_file_uris": ["gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"],
                            },
                            "labels": {"dag": DAG_NAME, "task": "job_re_encript_corp_to_retail",
                                       "bu_project": INGESTION_PROYECT}
        },
        region="us-central1",
        retries=5,
        project_id=PROJECT_ID
    ) 

第二部分,pyspark代码:

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, udf, substring, expr, row_number
from google.cloud import secretmanager
import base64
import hashlib
from Crypto.Cipher import AES
from Crypto.Protocol.KDF import PBKDF2
import sys
from google.cloud import bigquery
import logging
import json
from google.auth import default
from google.auth.transport.requests import Request
import base64
import binascii
import json
import logging
import os
from Crypto import Random
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad , unpad
from pbkdf2 import PBKDF2
from google.cloud import secretmanager_v1
import google.auth
import google.auth.impersonated_credentials
import traceback
from pyspark import SparkContext
import time
from time import localtime, strftime
import subprocess
from pyspark.sql.window import Window

PROJECT_ID = 'mycompany-datalake-prod'
GCS_WORKING_BUCKET = 'dataproc-datalake-a01350f1-0bef-0bef-0bef-20bf078e6d86'
GCS_TEMPORRARY_BUCKET = 'gs://dataproc-datalake-a01350f1-0bef-0bef-0bef-20bf078e6d86/dataproc-jobs/encrypt-decrypt/staging'
BQ_NATERIALIZATION_DATASET = 'wrk_dataproc_processes'
BQ_TEMPORARY_DATASET = 'wrk_dataproc_processes'

NUM_PARTITIONS = 10

GCS_BUCKET = "entity-datalake-f6639638-b893-44fa-ae66-b8f50729e10d"
GCS_AVRO_PATH = "co/co_entities/product/data/2024/08/04/*.avro"
API_ENDPOINT = "https://api.example.com/endpoint"
CORP_KEY =   "projects/special_corp_project/secrets/encrypt-corp/versions/latest"
RETAIL_KEY = "projects/mycompany-datalake-prd/secrets/encrypt-retail/versions/latest"
SA_FOR_RETAIL_KEY = "sa_for_retail_key@mycompany-datalake-prod.iam.gserviceaccount.com"
SA_FOR_CORP_KEY = "theservice_account@mycompany-datalake-prod.iam.gserviceaccount.com"

spark = SparkSession.builder \
    .appName("Email and Phone Validation") \
    .config('parentProject', PROJECT_ID) \
    .getOrCreate()


spark.conf.set("viewsEnabled","true")
spark.conf.set("materializationDataset",BQ_NATERIALIZATION_DATASET)
spark.conf.set("temporaryGcsBucket",GCS_TEMPORRARY_BUCKET)

sc = spark.sparkContext
sc = SparkContext.getOrCreate()
sc.setCheckpointDir(GCS_TEMPORRARY_BUCKET)

project_id = PROJECT_ID
retail_secret_id = 'encrypt-retail'
corp_secret_id = 'encrypt-corp'
project_retail_secret = 'mycompany-datalake-encrypt-prd'
project_corp_secret = 'corporate-secrets'

logger = logging.getLogger(__name__)

def delete_temporary_tables(dataset_id):
    client = bigquery.Client()

    # Listar todas las tablas en el dataset
    tables = client.list_tables(dataset_id)

    for table in tables:
        table_id = f"{dataset_id}.{table.table_id}"
        client.delete_table(table_id)
        print(f"Deleted table {table_id}")

def access_secret_version(secret_id):
    creds, pid = google.auth.default()
    print(f"Obtained default credentials for the project {pid}")
    if secret_id == RETAIL_KEY:
        SA_FOR_KEY = SA_FOR_RETAIL_KEY
    elif secret_id == CORP_KEY:
        SA_FOR_KEY = SA_FOR_CORP_KEY
    tcreds = google.auth.impersonated_credentials.Credentials(
        source_credentials=creds,
        target_principal=SA_FOR_KEY,
        target_scopes='https://www.googleapis.com/auth/cloud-platform'
    )
    
    client = secretmanager_v1.SecretManagerServiceClient(credentials=tcreds)

    # Access the secret version
    response = client.access_secret_version(name=secret_id)

    # Get the payload as a string
    secret_payload = response.payload.data.decode("UTF-8")

    return secret_payload

class AesCrypt256:
    # Based on https://gist.github.com/pfote/5099161
    BLOCK_SIZE = 16

    # To use the null/x00 byte array for the IV
    default_initialization_vector = False

    def __init__(self, default_initialization_vector=False):
        self.default_initialization_vector = default_initialization_vector

    def pkcs7_pad(self, s):
        s = s.encode("utf-8") if isinstance(s, str) else s
        pad_len = self.BLOCK_SIZE - (len(s) % self.BLOCK_SIZE)
        return s + bytes([pad_len]) * pad_len

    def pkcs7_unpad(self, s):
        return s[: -s[-1]]

    def _encrypt(self, key, value, iv):
        cipher = AES.new(key, AES.MODE_CBC, iv)
        value = value.encode("utf-8") if isinstance(value, str) else value
        crypted = cipher.encrypt(self.pkcs7_pad(value))

        # check if empty/null initialization vector, and do not prepend if null
        if all(v == 0 for v in iv):
            return crypted
        else:
            # prepend the initialization vector
            return iv + crypted

    def _decrypt(self, key, value, iv):
        cipher = AES.new(key, AES.MODE_CBC, iv)
        # unpad the bytes, throw away garbage at end
        return self.pkcs7_unpad(cipher.decrypt(value)).decode("utf-8")

    def encrypt(self, key, value):
        if self.default_initialization_vector:
            return self._encrypt(key, value, bytes(bytearray(16)))
        else:
            iv = Random.get_random_bytes(16)
            return self._encrypt(key, value, iv)

    def decrypt(self, key, value):
        if self.default_initialization_vector:
            # we do not have an IV present
            default_iv = bytes(bytearray(16))
            return self._decrypt(key, value, default_iv)
        else:
            iv = value[:16]
            crypted = value[16:]
            return self._decrypt(key, crypted, iv)

    def encryptHex(self, key, value):
        return (binascii.hexlify(self.encrypt(key, value))).decode("utf-8")

    def decryptHex(self, key, value):
        return self.decrypt(key, binascii.unhexlify(value))

class BasicEncript:
    def __init__(self, passp, salt):
        self.passp = passp
        self.salt = binascii.unhexlify(salt)
        iterations = 1024
        self.key = PBKDF2(
            passphrase=self.passp, salt=self.salt, iterations=iterations
        ).read(32)

    @property
    def aes(self):
        return AesCrypt256(default_initialization_vector=True)

    def encrypt(self, data):
        if data == None:
            return
        try:
            crypted = self.aes.encryptHex(self.key, data)
            return crypted
        except Exception as e:
            logging.error(f"error al encriptar valor: {data}  , error: {e}")

    def decrypt(self, data):
        if data == None:
            return
        try:
            crypted = self.aes.decryptHex(self.key, data)
            return crypted
        except Exception as e:
            logging.error(f"error al encriptar valor: {data}  , error: {e}")

class Encrypter:
    def __init__(self, key, iv):
        key_rtl = key
        iv_rtl = iv
        self.crypt_retail = BasicEncript(key_rtl, iv_rtl)

    def get_encrypter(self):
        return self.crypt_retail

    def encrypt(self, msj: str):
        return self.crypt_retail.encrypt(msj)

    def decrypt(self, msj: str):
        return self.crypt_retail.decrypt(msj)

class AwkEncryptation:
  def __init__(self,key,iv,block_size):
    self.key = key
    self.iv = iv
    self.block_size = block_size

  def decrypt(self,message):
    if message != None:
      try:
        enc = base64.b64decode(message)
        obj = AES.new(self.key, AES.MODE_CBC, self.iv)
        return unpad(obj.decrypt(enc), self.block_size).decode('utf-8')
      except Exception as e:
        return None
    return message

  def encrypt(self,message):
    if message != None:
      message_pad = pad(str(message).encode('utf-8'),self.block_size)
      obj = AES.new(self.key, AES.MODE_CBC, self.iv)
      msg_encrypt = obj.encrypt(message_pad)
      base = base64.b64encode(msg_encrypt)
      return base.decode('utf-8')
    return message

def call_mode(awk, mode, value):

    if mode == "encrypt": 
        return awk.encrypt(value)

    if mode == "decrypt":
        return awk.decrypt(value)

def process(key_type, key_values, call_mode_type, value):
    key_values = json.loads(key_values)
    key_values = {"key": key_values["passEncrypt"], "iv": key_values["salt"]}
    awk = None
    try:
        if key_type == "RETAIL_KEY":
            awk = Encrypter(key=key_values["key"], iv=key_values["iv"])

        elif key_type == "CORP_KEY":
            awk = AwkEncryptation(base64.b64decode(key_values['key']), base64.b64decode(key_values['iv']), 16)

        return_value = call_mode(awk, call_mode_type, value)

        return return_value
    except Exception as e:
        print(f"process NOK")
        print("------------------------------------------------------------------------------------------")
        print(traceback.format_exc())
        return logging.error(f"errorMessage: {str(e)}")

def decode_columns(df: DataFrame, columns_to_decode: list):
    for column in columns_to_decode:
        df = df.withColumn(column, col(column).cast("string"))  # Example transformation
    return df

def encode_columns(df: DataFrame, columns_to_decode: list):
    for column in columns_to_decode:
        df = df.withColumn(column, col(column).cast("string"))  # Example transformation
    return df

def load_data_from_bigquery_with_query(query: str):
    spark = SparkSession.builder \
        .appName("BQLoader") \
        .config('parentProject', 'mycompany-datalake-prod') \
        .config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.31.1") \
        .getOrCreate()
    
    table = 'mycompany-datalake-prod.co_entities.contactability_base'
    columns_to_select = ["CUST_NUM_ID", "EMAIL_DEF", "TELEPHONE_DEF", "EMAIL_CORP_DEF", "TELEPHONE_CORP_DEF"]
    filter = "_partitiondate = CURRENT_DATE('America/Bogota') - 1 AND (EMAIL_DEF IS NOT NULL AND TELEPHONE_DEF IS NOT NULL)"
    df = spark.read.format('bigquery').option('table', table) \
        .option('filter',filter) \
        .load() \
        .select(*columns_to_select) 

    return df

txt = "\n----------------------------------------------------------------------------------------\n"

def get_udfs_for_encryption():
    try:
        try:
            print(f"{txt}Retail Key: Try to access the secret manager")
            latest_retail_key = access_secret_version(RETAIL_KEY)
            print(f"{txt}Retail Key: access OK")
            try:
                print(f"{txt}Retail Key: Trying to create the udf")
                udf_retail_decrypt = udf(lambda value: process("RETAIL_KEY", latest_retail_key, "decrypt", value))
                udf_retail_encrypt = udf(lambda value: process("RETAIL_KEY", latest_retail_key, "encrypt", value))
                print(f"{txt}Retail Key: udf OK")
            except Exception as e:
                print(f"{txt}Retail Key: udf NOK")
                print(traceback.format_exc())
        except Exception as e:
            print(f"{txt}Retail Key: access denied")
            print(traceback.format_exc())
        try:
            print(f"{txt}Corp Key: Try to access the secret manager")
            latest_corp_key = access_secret_version(CORP_KEY)
            print(f"{txt}Corp Key: access NOK")
            try:
                print(f"{txt}Corp Key: Trying to create the udf")
                udf_corp_decrypt = udf(lambda value: process("CORP_KEY", latest_corp_key, "decrypt", value))
                udf_corp_encrypt = udf(lambda value: process("CORP_KEY", latest_corp_key, "encrypt", value))
                print(f"{txt}LLave corp: udf OK")
            except Exception as e:
                print(f"{txt}Corp Key: udf NOK")
                print(traceback.format_exc())
        except Exception as e:
            print(f"{txt}Corp Key: access denied")
            print(traceback.format_exc())

        return udf_retail_decrypt, udf_retail_encrypt, udf_corp_decrypt, udf_corp_encrypt

    except Exception as e:
            print(traceback.format_exc())

def decrypt_encript_query(
        sql_query, output_table, columns_and_alias_from_corp_to_retail, columns_and_alias_from_retail_to_corp, output_table_columns_order,
        columns_to_clean_first_characters):
    
    udf_retail_decrypt, udf_retail_encrypt, udf_corp_decrypt, udf_corp_encrypt = get_udfs_for_encryption()   
    print(f"{txt}Start reading the query")
    t1 = time.time()
    # Leer los datos desde BigQuery
    df = spark.read.format('bigquery') \
            .option('query', sql_query) \
            .option('use_legacy_sql', 'false') \
            .load()
    
    df.show(5, truncate=False)

    df = df.repartition(NUM_PARTITIONS)
    df.cache()
    size_df = df.count() 
    t2 = time.time()
    print(f"Time to read the query: {t2-t1}")

    print(f"Start decryption columns corp to retail")
    # Desencriptar las columnas necesarias y re-encriptarlas con la nueva llave
    for column_corp, alias_retail in columns_and_alias_from_corp_to_retail.items():
        decrypted_col = udf_corp_decrypt(col(column_corp))  # Desencriptar la columna 
        
        # Verificar si la columna debe limpiarse
        if column_corp in columns_to_clean_first_characters:
            num_chars_to_clean = int(columns_to_clean_first_characters[column_corp])
            truncated_col = substring(decrypted_col, num_chars_to_clean + 1, 256)  # Eliminar los primeros caracteres
            encrypted_col = udf_retail_encrypt(truncated_col)  # Re-encriptar la columna después de truncarla
        else:
            encrypted_col = udf_retail_encrypt(decrypted_col)
        
        df = df.withColumn(alias_retail, encrypted_col)  # Añadir la columna re-encriptada al DataFrame
            
    size_df = df.count()
    t3 = time.time()
    print(f"Time for decryption columns corp to retail: {t3-t2}")

    print(f"Start decryption columns retail to corp")
    for column_retail, alias_corp in columns_and_alias_from_retail_to_corp.items():
        df = df.withColumn(alias_corp, udf_corp_encrypt(udf_retail_decrypt(col(column_retail))))
    
    size_df = df.count()
    t4 = time.time()
    print(f"Time for decryption columns retail to corp: {t4-t3}")

    print(f"Start column sorting for output table")
    # Ordenar las columnas según `output_table_columns_order`
    ordered_columns = [output_table_columns_order[i] for i in sorted(output_table_columns_order.keys())]
    df = df.select(*ordered_columns)
    
    t5 = time.time()
    print(f"Time for column sorting for output table: {t5-t4}")

    size_df = df.count()
    #df.show(5, truncate=False)
    print(f"Total row number: {size_df}")

    print(f"Start write to output table")
    # Guardar los resultados en la tabla de salida
    df.write.format('bigquery') \
        .option('table', output_table) \
        .option("temporaryGcsBucket", GCS_WORKING_BUCKET) \
        .option('writeDisposition', 'WRITE_TRUNCATE') \
        .mode('overwrite') \
        .save()
    
    t6 = time.time()
    print(f"Time to write to output table: {t6-t5}")
 
params_json = sys.argv[1]
params = json.loads(params_json)

sql_query = params.get('sql_query')
columns_and_alias_from_corp_to_retail = params.get('columns_and_alias_from_corp_to_retail', {})
columns_and_alias_from_retail_to_corp = params.get('columns_and_alias_from_retail_to_corp', {})
output_table = params.get('output_table')
output_table_columns_order = params.get('output_table_columns_order', {})
columns_to_clean_first_characters = params.get('columns_to_clean_first_characters', {})

if __name__ == "__main__":
    try:
        t1 = time.time()
        print(f"SQL Query: {sql_query}")
        print(f"Columns from corp to retail: {list(columns_and_alias_from_corp_to_retail.keys())}")
        print(f"Aliases from corp to retail: {list(columns_and_alias_from_corp_to_retail.values())}")
        print(f"Columns to clean first characters: {columns_to_clean_first_characters}")
        print(f"Output Table: {output_table}")
        print(f"Output Table Columns Order: {output_table_columns_order}")

        decrypt_encript_query(
            sql_query=sql_query,
            output_table=output_table,
            columns_and_alias_from_corp_to_retail=columns_and_alias_from_corp_to_retail,
            columns_and_alias_from_retail_to_corp=columns_and_alias_from_retail_to_corp,
            output_table_columns_order=output_table_columns_order,
            columns_to_clean_first_characters=columns_to_clean_first_characters
        )
        t2 = time.time()
        print(f"{txt}Total script execution time: {t2-t1}")
        spark.stop()
    except Exception as e:
        print("An error occurred:")
        print(traceback.format_exc())
        spark.stop()
        raise
© www.soinside.com 2019 - 2024. All rights reserved.