无法使用胶水创建红移表

问题描述 投票:0回答:1

收到错误:异常:java.sql.SQLException:awaitResult中抛出异常:

表正在创建flight_details,但其中只有一列虚拟列,并且创建表sql查询中定义的模式不存在。 因此,请帮忙,或者是否有任何方法可以在亚马逊红移中使用胶水创建一个空表。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame

## @params: [JOB_NAME]
job_name = os.environ.get('AWS_GLUE_JOB_NAME', 'testing1')
args = getResolvedOptions(sys.argv, ['JOB_NAME','TempDir'])
temp_dir = args['TempDir']
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

job.commit()
#temp_dir = "s3://prateekproject1/temp_dir/"
redshift_url = "jdbc:redshift://flighttest.ckbw5q2qccz6.eu-north-1.redshift.amazonaws.com:5439/dev"
redshift_user = "awsuser"
redshift_password = "Prateek1997"
redshift_table = "flight.flight_details"
create_table_sql = f"""
    CREATE TABLE IF NOT EXISTS {redshift_table}(
    Carrier varchar(100),
    OriginAirportID INT,
    DestAirportID   INT,
    DepDelay    INT,
    ArrDelay    INT
    )
    DISTSTYLE KEY
    DISTKEY(OriginAirportID);
 """
 
try: 
    empty_df = spark.createDataFrame([], schema="dummy STRING")
    spark._jvm.java.lang.Class.forName("com.amazon.redshift.jdbc42.Driver")
    glueContext.write_dynamic_frame.from_options(
        frame=DynamicFrame.fromDF(empty_df, glueContext, "empty_frame"),
        connection_type="redshift",
        connection_options={
            "url": redshift_url,
            "user": redshift_user,
            "password": redshift_password,
            "preactions": create_table_sql,  # Executes the CREATE TABLE query before loading data
            "dbtable": "flight.flight_details",  # The table name
            "redshiftTmpDir": temp_dir  # Temporary directory for Redshift interactions
            }
        )
    database_name = "flight"  # Replace with the database name
    table_name = "prateekproject1"
    df = glueContext.create_dynamic_frame.from_catalog(
    database = database_name,
    table_name = table_name
    )
    df.printSchema()
    dataframe = df.toDF()
    print("Sample Data:")
    dataframe.show(10)
    print("Table created succesfully")
except Exception as e:
    print(e)
job.commit()````
pyspark amazon-redshift aws-glue
1个回答
0
投票
    // delete the table if exists to ensure the changes are applied -> remove later
create_table_sql = f"""
    DROP TABLE IF EXISTS {redshift_table};
    CREATE TABLE IF NOT EXISTS {redshift_table}(
    Carrier varchar(100),
    OriginAirportID INT,
    DestAirportID   INT,
    DepDelay    INT,
    ArrDelay    INT
    )
    DISTSTYLE KEY
    DISTKEY(OriginAirportID);
 """
 
    
    hardcoded_data = [Row(GarbageColumn="hardcoded_value")]
    empty_df = spark.createDataFrame(hardcoded_data, schema=StructType([StructField("GarbageColumn", StringType(), True)]))
    
    spark._jvm.java.lang.Class.forName("com.amazon.redshift.jdbc42.Driver")
    
    glueContext.write_dynamic_frame.from_options(
        frame=DynamicFrame.fromDF(empty_df, glueContext, "empty_frame"),
        connection_type="redshift",
        connection_options={
            "url": redshift_url,
            "user": redshift_user,
            "password": redshift_password,
            "preactions": create_table_sql,  # Executes the CREATE TABLE query before loading data
            "postactions": "begin; drop table if exists temp_dummy_123123123; end;" 
            "dbtable": "flight.temp_dummy_123123123",  # use a dummy name -> this will be automatically created by Glue. so make it a dummy and then delete it in 
            "redshiftTmpDir": temp_dir  # Temporary directory for Redshift interactions
            }
        )
    

pss:如果我的回复对你有帮助,可以点赞...

© www.soinside.com 2019 - 2024. All rights reserved.