即使在成功完成代码完成后,AWS Glue也会发出错误

问题描述 投票:0回答:1

我在AWS Glue中创建和运行作业的Python代码是:

from datetime import datetime, timedelta
from time import sleep
import boto3
glue = boto3.client(
    service_name='glue',
    region_name='ap-south-1',
    endpoint_url='https://glue.ap-south-1.amazonaws.com'
)

myJob = glue.create_job(
    Name='sample',
    Role='Name_of_my_role',
    Command={
        'Name': 'glueetl',
        'ScriptLocation': 's3://s3-location'
    }
)

myNewJobRun = glue.start_job_run(JobName=myJob['Name'])

target_time = datetime.utcnow() + timedelta(minutes=5)
while(datetime.utcnow() < target_time):
    status = glue.get_job_run(JobName=myJob['Name'], RunId=myNewJobRun['JobRunId'])
    print status['JobRun']['JobRunState']
    sleep(30)

需要运行的脚本是:

print "Hello World!"
print "Sevilla lost against Messi FC!"

这是来自example。在完成作业后,它最终出现错误:Command failed with exit code 1并且在从控制台检查日志和错误日志时,我得到了我想要的结果,即脚本中的上述两行运行良好。这是错误日志;

Container: ****
LogType:stdout
Log Upload Time:Mon Feb 25 10:46:40 +0000 2019
LogLength:44
Log Contents:
Hello World!
Sevilla lost against Messi FC!
End of LogType:stdout
python amazon-web-services aws-glue
1个回答
0
投票

我忘了最后添加job.commit()。例如:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import DataFrameReader, DataFrameWriter
from datetime import datetime

import time

# @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

print("All imports were successful.")

df = spark.read.orc(
    's3://****'
)
print("First dataframe read with headers set to True")
df2 = spark.read.orc(
    's3://****'
)
print("Second dataframe read with headers set to True")

# df3 = df.join(df2, ['c_0'], "outer")

# df3 = df.join(
#     df2,
#     df["column_test_1"] == df2["column_1"],
#     "outer"
# )

df3 = df.alias('l').join(df2.alias('r'), on='c_0') #.collect()

print("Dataframes have been joined successfully.")
output_file_path = 's3://****'
)

df3.write.orc(
    output_file_path
)
print("Dataframe has been written to csv.")
job.commit()
© www.soinside.com 2019 - 2024. All rights reserved.