HIVE_PARTITION_SCHEMA_MISMATCH:表和分区架构之间不匹配

问题描述 投票:0回答:1

我在 AWS 中实现了一个管道,其中我的数据存储在名为“input-bucket”的存储桶中。在这个存储桶中,有一个包含各种压缩文件的文件。我编写了一个 Glue 作业来解压缩这些数据,将其转换为 CSV 格式,并将其存储在目标存储桶中。

下面是我的胶水作业的代码:

   import boto3
   import tempfile
   from pyspark.sql import SparkSession
   import zipfile
   import os
   import pyspark.sql.functions as f
   import datetime
   from pyspark.sql.functions import col

   spark = (
       SparkSession.builder.appName("Unzip_Zipped_Files")
       .config("spark.driver.extraJavaOptions", "-Duser.timezone=GMT+5:30")
       .config("spark.executor.extraJavaOptions", "-Duser.timezone=GMT+5:30")
       .getOrCreate()
   )

def list_folders_and_files(read_bucket_name, prefix):
    s3_client = boto3.client('s3')
    paginator = s3_client.get_paginator('list_objects_v2')
    page_iterator = paginator.paginate(Bucket=read_bucket_name, Prefix=prefix, Delimiter='/')
    target_bucket_name = "target-bucket"
    for page in page_iterator:
        for prefix in page.get('CommonPrefixes', []):
            folder_name = prefix.get('Prefix').rstrip('/')
            print(f"Found folder: {folder_name}")
            list_folders_and_files(read_bucket_name, folder_name+ '/')
        for obj in page.get('Contents', []):
            key = obj['Key']
            if key.endswith('.zip'):
                file_name = key.split('/')[-1]
                current_folder_name = key.split("/")[0]
                print(f"unextracted file name is {key}")
                unzip_and_upload_file(s3_client, read_bucket_name, target_bucket_name, key,             current_folder_name)
                    
def unzip_and_upload_file(s3_client, read_bucket_name, target_bucket_name, key, new_target_folder_key):
         with tempfile.TemporaryDirectory() as tmpdir:
            download_path = os.path.join(tmpdir, os.path.basename(key))
            s3_client.download_file(read_bucket_name, key, download_path)
            with zipfile.ZipFile(download_path, 'r') as zip_ref:
                zip_ref.extractall(tmpdir)
            for file in os.listdir(tmpdir):
                if file != os.path.basename(key):
                    file_name = file.split(".csv")[0]
                    if file_name.lower() != ("static_pool_data_ftd") and file_name !=  ("mShakti_Usage_Report_FTD.xls"):
                        new_key = new_target_folder_key + "/"+ file
                        print(f"file to be uploaded under this path {new_key}")
                        s3_client.upload_file(os.path.join(tmpdir, file), target_bucket_name, new_key) #upload file to the target destination bucket
                        df = spark.read.option("header", "true").option("inferSchema", "true").csv(f"s3://{target_bucket_name}/{new_target_folder_key}/{file}", sep='|')
                        #result_df = df
                        result_df= df.withColumn("partition_date",f.lit(new_target_folder_key))
                        if file_name =="Loan_Closure_and_Foreclosure_Report_MTD":
                            clean_Loan_Closure_and_Foreclosure_Report_MTD_data(result_df, target_bucket_name, file_name, new_target_folder_key)
                        elif file_name=="NPA_Recovery_Report_MTD":
                            clean_NPA_recovery_report_rate(result_df, target_bucket_name, file_name, new_target_folder_key)
                        elif file_name == "customer_wise_disbursement_report_mtd":
                            result_df = result_df.withColumn("sales officer code", col("sales officer code").cast("double"))
                            result_df.coalesce(1).write.mode("overwrite").partitionBy("partition_date").option("header", "true").csv(f"s3://{target_bucket_name}/{file_name}/")
                        elif file_name == "collection_recon_report_mtd":
                            result_df = result_df.withColumn("total collection", col("total collection").cast("string"))
                            result_df.coalesce(1).write.mode("overwrite").partitionBy("partition_date").option("header", "true").csv(f"s3://{target_bucket_name}/{file_name}/")
                        else:
                            result_df.coalesce(1).write.mode("overwrite").partitionBy("partition_date").option("header","true").csv(f"s3://{target_bucket_name}/{file_name}/")
                        print(f"s3://{target_bucket_name}/{new_target_folder_key}/{file} is the file read...")
                        s3_client.delete_object(Bucket=target_bucket_name,Key=f"{new_target_folder_key}/{file}")
                        print(f"Objects deleted {new_target_folder_key}/{file}")


def clean_NPA_recovery_report_rate(df, target_bucket_name, file_name, new_target_folder_key):
    try:
        df = df.withColumn("Outstanding amount as on NPA date", f.regexp_replace(f.col("Outstanding amount as on NPA date"), ",", ""))
        # df.coalesce(1).write.mode("overwrite").partitionBy("partition_date").option("header","true").csv(f"s3://{target_bucket_name}/{file_name}/")
        df.coalesce(1).write.mode("overwrite").option("header","true").csv(f"s3://{target_bucket_name}/{file_name}/{new_target_folder_key}/")
    except Exception as e:
        raise e    

def clean_Loan_Closure_and_Foreclosure_Report_MTD_data(df, target_bucket_name, file_name, new_target_folder_key):
    try:
        for column in ["BRANCH CODE","CUSTOMER NUMBER","ACCOUNT NUMBER"]:
            df = df.withColumn(column, f.regexp_replace(f.col(column), "'", ""))
        # df.coalesce(1).write.mode("overwrite").partitionBy("partition_date").option("header","true").csv(f"s3://{target_bucket_name}/{file_name}/")
        df.coalesce(1).write.mode("overwrite").option("header","true").csv(f"s3://{target_bucket_name}/{file_name}/{new_target_folder_key}/")
    except Exception as e:
        raise e

if __name__=="__main__":
    list_folders_and_files("input-bucket","28092024")

执行作业后,我在目标存储桶上运行爬虫,这会创建一个数据目录,以便于在 Athena 中查询数据。

当我第一次查询数据时,这工作得很好。但是,当我以 OVERWRITE 模式加载第二天的数据并在 Athena 中查询时,遇到错误,提示其中一个表,前一个分区中的数据类型是

BIGINT
,而它是现在声明为
DOUBLE
,导致不匹配。

我尝试将数据类型转换为

DOUBLE
(对于较新到达的数据),但仍然收到相同的错误,表明前一个分区包含
DOUBLE
,而当前数据是
BIGINT

我的最终用例是确保 Athena 中仅显示最新数据。

您能告诉我如何解决这个问题吗?

这是我的爬虫图片[https://i.sstatic.net/bZufAYEU.png][1]

amazon-web-services apache-spark amazon-athena
1个回答
0
投票

考虑到您正在处理 CSV,不能保证列顺序,您可能有另一个分区(使用旧架构),因此您需要通过检查这些选项来更新所有分区的架构:

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.