收到错误:异常:java.sql.SQLException:awaitResult中抛出异常:
表正在创建flight_details,但其中只有一列虚拟列,并且创建表sql查询中定义的模式不存在。 因此,请帮忙,或者是否有任何方法可以在亚马逊红移中使用胶水创建一个空表。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
## @params: [JOB_NAME]
job_name = os.environ.get('AWS_GLUE_JOB_NAME', 'testing1')
args = getResolvedOptions(sys.argv, ['JOB_NAME','TempDir'])
temp_dir = args['TempDir']
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
job.commit()
#temp_dir = "s3://prateekproject1/temp_dir/"
redshift_url = "jdbc:redshift://flighttest.ckbw5q2qccz6.eu-north-1.redshift.amazonaws.com:5439/dev"
redshift_user = "awsuser"
redshift_password = "Prateek1997"
redshift_table = "flight.flight_details"
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS {redshift_table}(
Carrier varchar(100),
OriginAirportID INT,
DestAirportID INT,
DepDelay INT,
ArrDelay INT
)
DISTSTYLE KEY
DISTKEY(OriginAirportID);
"""
try:
empty_df = spark.createDataFrame([], schema="dummy STRING")
spark._jvm.java.lang.Class.forName("com.amazon.redshift.jdbc42.Driver")
glueContext.write_dynamic_frame.from_options(
frame=DynamicFrame.fromDF(empty_df, glueContext, "empty_frame"),
connection_type="redshift",
connection_options={
"url": redshift_url,
"user": redshift_user,
"password": redshift_password,
"preactions": create_table_sql, # Executes the CREATE TABLE query before loading data
"dbtable": "flight.flight_details", # The table name
"redshiftTmpDir": temp_dir # Temporary directory for Redshift interactions
}
)
database_name = "flight" # Replace with the database name
table_name = "prateekproject1"
df = glueContext.create_dynamic_frame.from_catalog(
database = database_name,
table_name = table_name
)
df.printSchema()
dataframe = df.toDF()
print("Sample Data:")
dataframe.show(10)
print("Table created succesfully")
except Exception as e:
print(e)
job.commit()````
// delete the table if exists to ensure the changes are applied -> remove later
create_table_sql = f"""
DROP TABLE IF EXISTS {redshift_table};
CREATE TABLE IF NOT EXISTS {redshift_table}(
Carrier varchar(100),
OriginAirportID INT,
DestAirportID INT,
DepDelay INT,
ArrDelay INT
)
DISTSTYLE KEY
DISTKEY(OriginAirportID);
"""
hardcoded_data = [Row(GarbageColumn="hardcoded_value")]
empty_df = spark.createDataFrame(hardcoded_data, schema=StructType([StructField("GarbageColumn", StringType(), True)]))
spark._jvm.java.lang.Class.forName("com.amazon.redshift.jdbc42.Driver")
glueContext.write_dynamic_frame.from_options(
frame=DynamicFrame.fromDF(empty_df, glueContext, "empty_frame"),
connection_type="redshift",
connection_options={
"url": redshift_url,
"user": redshift_user,
"password": redshift_password,
"preactions": create_table_sql, # Executes the CREATE TABLE query before loading data
"postactions": "begin; drop table if exists temp_dummy_123123123; end;"
"dbtable": "flight.temp_dummy_123123123", # use a dummy name -> this will be automatically created by Glue. so make it a dummy and then delete it in
"redshiftTmpDir": temp_dir # Temporary directory for Redshift interactions
}
)
pss:如果我的回复对你有帮助,可以点赞...