我在 AWS 中实现了一个管道,其中我的数据存储在名为“input-bucket”的存储桶中。在这个存储桶中,有一个包含各种压缩文件的文件。我编写了一个 Glue 作业来解压缩这些数据,将其转换为 CSV 格式,并将其存储在目标存储桶中。
下面是我的胶水作业的代码:
import boto3
import tempfile
from pyspark.sql import SparkSession
import zipfile
import os
import pyspark.sql.functions as f
import datetime
from pyspark.sql.functions import col
spark = (
SparkSession.builder.appName("Unzip_Zipped_Files")
.config("spark.driver.extraJavaOptions", "-Duser.timezone=GMT+5:30")
.config("spark.executor.extraJavaOptions", "-Duser.timezone=GMT+5:30")
.getOrCreate()
)
def list_folders_and_files(read_bucket_name, prefix):
s3_client = boto3.client('s3')
paginator = s3_client.get_paginator('list_objects_v2')
page_iterator = paginator.paginate(Bucket=read_bucket_name, Prefix=prefix, Delimiter='/')
target_bucket_name = "target-bucket"
for page in page_iterator:
for prefix in page.get('CommonPrefixes', []):
folder_name = prefix.get('Prefix').rstrip('/')
print(f"Found folder: {folder_name}")
list_folders_and_files(read_bucket_name, folder_name+ '/')
for obj in page.get('Contents', []):
key = obj['Key']
if key.endswith('.zip'):
file_name = key.split('/')[-1]
current_folder_name = key.split("/")[0]
print(f"unextracted file name is {key}")
unzip_and_upload_file(s3_client, read_bucket_name, target_bucket_name, key, current_folder_name)
def unzip_and_upload_file(s3_client, read_bucket_name, target_bucket_name, key, new_target_folder_key):
with tempfile.TemporaryDirectory() as tmpdir:
download_path = os.path.join(tmpdir, os.path.basename(key))
s3_client.download_file(read_bucket_name, key, download_path)
with zipfile.ZipFile(download_path, 'r') as zip_ref:
zip_ref.extractall(tmpdir)
for file in os.listdir(tmpdir):
if file != os.path.basename(key):
file_name = file.split(".csv")[0]
if file_name.lower() != ("static_pool_data_ftd") and file_name != ("mShakti_Usage_Report_FTD.xls"):
new_key = new_target_folder_key + "/"+ file
print(f"file to be uploaded under this path {new_key}")
s3_client.upload_file(os.path.join(tmpdir, file), target_bucket_name, new_key) #upload file to the target destination bucket
df = spark.read.option("header", "true").option("inferSchema", "true").csv(f"s3://{target_bucket_name}/{new_target_folder_key}/{file}", sep='|')
#result_df = df
result_df= df.withColumn("partition_date",f.lit(new_target_folder_key))
if file_name =="Loan_Closure_and_Foreclosure_Report_MTD":
clean_Loan_Closure_and_Foreclosure_Report_MTD_data(result_df, target_bucket_name, file_name, new_target_folder_key)
elif file_name=="NPA_Recovery_Report_MTD":
clean_NPA_recovery_report_rate(result_df, target_bucket_name, file_name, new_target_folder_key)
elif file_name == "customer_wise_disbursement_report_mtd":
result_df = result_df.withColumn("sales officer code", col("sales officer code").cast("double"))
result_df.coalesce(1).write.mode("overwrite").partitionBy("partition_date").option("header", "true").csv(f"s3://{target_bucket_name}/{file_name}/")
elif file_name == "collection_recon_report_mtd":
result_df = result_df.withColumn("total collection", col("total collection").cast("string"))
result_df.coalesce(1).write.mode("overwrite").partitionBy("partition_date").option("header", "true").csv(f"s3://{target_bucket_name}/{file_name}/")
else:
result_df.coalesce(1).write.mode("overwrite").partitionBy("partition_date").option("header","true").csv(f"s3://{target_bucket_name}/{file_name}/")
print(f"s3://{target_bucket_name}/{new_target_folder_key}/{file} is the file read...")
s3_client.delete_object(Bucket=target_bucket_name,Key=f"{new_target_folder_key}/{file}")
print(f"Objects deleted {new_target_folder_key}/{file}")
def clean_NPA_recovery_report_rate(df, target_bucket_name, file_name, new_target_folder_key):
try:
df = df.withColumn("Outstanding amount as on NPA date", f.regexp_replace(f.col("Outstanding amount as on NPA date"), ",", ""))
# df.coalesce(1).write.mode("overwrite").partitionBy("partition_date").option("header","true").csv(f"s3://{target_bucket_name}/{file_name}/")
df.coalesce(1).write.mode("overwrite").option("header","true").csv(f"s3://{target_bucket_name}/{file_name}/{new_target_folder_key}/")
except Exception as e:
raise e
def clean_Loan_Closure_and_Foreclosure_Report_MTD_data(df, target_bucket_name, file_name, new_target_folder_key):
try:
for column in ["BRANCH CODE","CUSTOMER NUMBER","ACCOUNT NUMBER"]:
df = df.withColumn(column, f.regexp_replace(f.col(column), "'", ""))
# df.coalesce(1).write.mode("overwrite").partitionBy("partition_date").option("header","true").csv(f"s3://{target_bucket_name}/{file_name}/")
df.coalesce(1).write.mode("overwrite").option("header","true").csv(f"s3://{target_bucket_name}/{file_name}/{new_target_folder_key}/")
except Exception as e:
raise e
if __name__=="__main__":
list_folders_and_files("input-bucket","28092024")
执行作业后,我在目标存储桶上运行爬虫,这会创建一个数据目录,以便于在 Athena 中查询数据。
当我第一次查询数据时,这工作得很好。但是,当我以 OVERWRITE 模式加载第二天的数据并在 Athena 中查询时,遇到错误,提示其中一个表,前一个分区中的数据类型是
BIGINT
,而它是现在声明为 DOUBLE
,导致不匹配。
我尝试将数据类型转换为
DOUBLE
(对于较新到达的数据),但仍然收到相同的错误,表明前一个分区包含DOUBLE
,而当前数据是BIGINT
。
我的最终用例是确保 Athena 中仅显示最新数据。
您能告诉我如何解决这个问题吗?
这是我的爬虫图片[https://i.sstatic.net/bZufAYEU.png][1]