我在AWS Glue中创建和运行作业的Python代码是:
from datetime import datetime, timedelta
from time import sleep
import boto3
glue = boto3.client(
service_name='glue',
region_name='ap-south-1',
endpoint_url='https://glue.ap-south-1.amazonaws.com'
)
myJob = glue.create_job(
Name='sample',
Role='Name_of_my_role',
Command={
'Name': 'glueetl',
'ScriptLocation': 's3://s3-location'
}
)
myNewJobRun = glue.start_job_run(JobName=myJob['Name'])
target_time = datetime.utcnow() + timedelta(minutes=5)
while(datetime.utcnow() < target_time):
status = glue.get_job_run(JobName=myJob['Name'], RunId=myNewJobRun['JobRunId'])
print status['JobRun']['JobRunState']
sleep(30)
需要运行的脚本是:
print "Hello World!"
print "Sevilla lost against Messi FC!"
这是来自example。在完成作业后,它最终出现错误:Command failed with exit code 1
并且在从控制台检查日志和错误日志时,我得到了我想要的结果,即脚本中的上述两行运行良好。这是错误日志;
Container: ****
LogType:stdout
Log Upload Time:Mon Feb 25 10:46:40 +0000 2019
LogLength:44
Log Contents:
Hello World!
Sevilla lost against Messi FC!
End of LogType:stdout
我忘了最后添加job.commit()
。例如:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import DataFrameReader, DataFrameWriter
from datetime import datetime
import time
# @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
print("All imports were successful.")
df = spark.read.orc(
's3://****'
)
print("First dataframe read with headers set to True")
df2 = spark.read.orc(
's3://****'
)
print("Second dataframe read with headers set to True")
# df3 = df.join(df2, ['c_0'], "outer")
# df3 = df.join(
# df2,
# df["column_test_1"] == df2["column_1"],
# "outer"
# )
df3 = df.alias('l').join(df2.alias('r'), on='c_0') #.collect()
print("Dataframes have been joined successfully.")
output_file_path = 's3://****'
)
df3.write.orc(
output_file_path
)
print("Dataframe has been written to csv.")
job.commit()