我试图找出我的AWS胶水工作指标的含义,以及可能的失败原因
从第二个图表我注意到驱动程序内存(蓝色)保持相对恒定,而一些执行程序波动。奇怪的是,似乎大多数遗嘱执行人都无能为力?这是为什么?
我的代码:我正在阅读Glue数据目录中的一堆航班信息,处理它并将其作为镶木地板文件写在S3中。在这种情况下,我使用了谓词下推。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import UserDefinedFunction, regexp_replace, to_timestamp
from datetime import datetime, timedelta
from pyspark.sql.types import ArrayType, DateType, Row
batches = [
"12929,14511,9968,15280,16162,17210,10193,14534,12542,13439,16122,9498,13301,12728,13303,16716,13311,12913,13531",
"12191",
"10658,11052,9970,13078,17388,10438,17396,17409,12917,17416,12118,12195",
"9921,9799,15253,16587,15412,17106,17368,13804,15461,19461,16923,16945,13164,9794,10031,25396,15422,10101,17002,14147,13180,11336,13428,9449,25405,16955,10180,11017,12795,12952,10485,12210,25336,17152,16516,16451,16437,15395,13947,10182,11893,11109",
"11036,17471,17482,16240,10902,17521,12071,12337,15526,17294,15671,12274,10858,10032",
"13113,11170,14213,18490,17402,10982,12392,12482,15168,9762,10871,11780,10284,10431,16743,15518,10497,13536,10724,14260,16747"
]
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# READ IN FLIGHTS, AIRPORTS, AGENTS TABLES
# NOTE: Bookmarks enabled for flights data catalog
airportsGDF = glueContext.create_dynamic_frame.from_catalog(database = "...", table_name = "airports")
airportsDF = airportsGDF.toDF()
airportsDF.createOrReplaceTempView("airports")
agentsGDF = glueContext.create_dynamic_frame.from_catalog(database = "...", table_name = "agents")
agentsRawDF = agentsGDF.toDF()
agentsRawDF.createOrReplaceTempView("agents_raw")
agentsDF = spark.sql("""
SELECT * FROM agents_raw
WHERE type IN ('Airline', 'TravelAgent')
""")
agentsDF.createOrReplaceTempView("agents")
for batch in batches:
flightsGDF = glueContext.create_dynamic_frame.from_catalog(database = "...", table_name = "flights", transformation_ctx="flights", push_down_predicate="""
querydatetime BETWEEN '2019-01-22' AND '2019-01-31'
AND querydestinationplace IN (%s)
""" % (batch))
flightsDf = flightsGDF.toDF()
flightsDf.createOrReplaceTempView("flights")
resultDf = spark.sql("""
SELECT
f.*, countryName, cityName, airportName, a.name AS agentName,
CONCAT(f.outboundlegid, '-', f.inboundlegid, '-', f.agent) AS key
FROM flights f
LEFT JOIN agents a
ON cast(f.agent as bigint) = a.id
LEFT JOIN airports p
ON cast(f.querydestinationplace as bigint) = p.airportId
""")
df = resultDf.withColumn("querydatetime", regexp_replace(resultDf["querydatetime"], "-", "").cast("int"))
df = resultDf.withColumn("queryoutbounddate", regexp_replace(resultDf["queryoutbounddate"], "-", "").cast("int"))
df = resultDf.withColumn("queryinbounddate", regexp_replace(resultDf["queryinbounddate"], "-", "").cast("int"))
df = resultDf.withColumn("outdeparture", to_timestamp(resultDf["outdeparture"], "yyyy-MM-ddTHH:mm:ss"))
df = resultDf.withColumn("outarrival", to_timestamp(resultDf["outarrival"], "yyyy-MM-ddTHH:mm:ss"))
df = resultDf.withColumn("indeparture", to_timestamp(resultDf["indeparture"], "yyyy-MM-ddTHH:mm:ss"))
df = resultDf.withColumn("inarrival", to_timestamp(resultDf["inarrival"], "yyyy-MM-ddTHH:mm:ss"))
print("===LOG:WRITING_RAW===")
df \
.write \
.mode("append") \
.partitionBy(["countryName", "querydatetime"]) \
.parquet("s3://...-glue/rawFlights")
print("===LOG:DONE_WRITING_RAW===")
df.createOrReplaceTempView("flights")
# GET DISTINCT DATASET
distinctKeysDf = resultDf.select(resultDf['key']).distinct
df.createOrReplaceTempView("distinctKeys")
def generate_date_series(start, stop):
return [start + timedelta(days=x) for x in range(0, (stop-start).days + 1)]
spark.udf.register("generate_date_series", generate_date_series, ArrayType(DateType()))
# GET RELAVENT DATES DATASET
# CREATE DF FOR PAST 90 DAYS EXCLUDING PAST 7 DAYS
today = datetime.utcnow().date()
start = today - timedelta(days = 25) # TODO: CHANGE TO 90
sevenDaysAgo = today - timedelta(days = 7)
print(">>> Generate data frame for ", start, " to ", sevenDaysAgo, "... ")
relaventDatesDf = spark.createDataFrame([
Row(start=start, stop=sevenDaysAgo)
])
relaventDatesDf.createOrReplaceTempView("relaventDates")
spark.sql("SELECT explode(generate_date_series(start, stop)) FROM relaventDates").show()
print("===LOG:WRITING_EXPANDED===")
expandedKeyDatesDf = spark.sql("""
SELECT key, querydatetime
FROM relaventDates
CROSS JOIN distinctKeys
""")
print("===LOG:DONE_WRITING_EXPANDED===")
expandedKeyDatesDf \
.coalesce(1) \
.write \
.mode("append") \
.parquet("s3://...-glue/expanded")
expandedKeyDatesDf.createOrReplaceTempView("expandedKeyDates")
cleanedFlightsDf = spark.sql("""
SELECT e.key AS master_key, e.querydatetime AS master_querydatetime, f.*
FROM expandedKeyDates e
LEFT JOIN flights f
ON e.key = f.key
AND e.querydatetime = f.querydatetime
ORDER BY e.key, e.querydatetime
""")
print("===LOG:WRITING_CLEANED===")
cleanedFlightsDf \
.write \
.mode("append") \
.partitionBy(["countryName", "querydatetime"]) \
.parquet("s3://...-glue/cleanedFlights")
print("===LOG:DONE_WRITING_CLEANED===")
print("===LOG:DONE BATCH %s" % (batch))
job.commit()
似乎没有尝试将扩展数据框写入镶木地板
expandedKeyDatesDf = spark.sql("""
SELECT key, querydatetime
FROM relaventDates
CROSS JOIN distinctKeys
""")
但是为什么因为这只是加入2,1列数据帧
此图表可以显示从头开始运行的执行程序。我想新的执行者是在中间发起的。转到CloudWatch Metrics,您会发现启动了很多执行程序。
只是为了记录提醒自己什么解决了问题或至少基于我记得的东西:
使用谓词下推来限制数据读取:
使用谓词下推来减少读取的数据量
flightsGDF = glueContext.create_dynamic_frame.from_catalog(database = "pinfare", table_name = "flights", transformation_ctx="flights", push_down_predicate="""
querydatetime BETWEEN '%s' AND '%s'
AND querydestinationplace IN (%s)
""" % (start.strftime("%Y-%m-%d"), today.strftime("%Y-%m-%d"), ",".join(map(lambda s: str(s), arr))))
并使用广播提示,如:
resultDf = spark.sql("""
WITH f (
SELECT
/*+ BROADCAST(h) */
/*+ COALESCE(12) */
CONCAT(f.outboundlegid, '-', f.inboundlegid, '-', f.agent) AS key,
f.querydatetime,
f.outboundlegid,
f.inboundlegid,
f.agent,
f.queryoutbounddate,
f.queryinbounddate,
f.price,
f.outdeparture,
f.outarrival,
f.indeparture,
f.inarrival,
f.querydestinationplace,
f.numberoutstops,
CASE WHEN type = 'HOLIDAY' AND (out_date BETWEEN start AND end)
THEN true
ELSE false
END out_is_holiday,
CASE WHEN type = 'LONG_WEEKENDS' AND (out_date BETWEEN start AND end)
THEN true
ELSE false
END out_is_longweekends,
CASE WHEN type = 'HOLIDAY' AND (in_date BETWEEN start AND end)
THEN true
ELSE false
END in_is_holiday,
CASE WHEN type = 'LONG_WEEKENDS' AND (in_date BETWEEN start AND end)
THEN true
ELSE false
END in_is_longweekends
FROM flights f
CROSS JOIN holidays h
)
SELECT
/*+ BROADCAST(a) */
/*+ BROADCAST(p) */
key,
querydatetime,
first(outboundlegid) as outboundlegid,
first(inboundlegid) as inboundlegid,
first(agent) as agent,
first(p.countryName) as countryName,
first(p.airportName) as airportName,
first(p.airportCode) as airportCode,
first(a.name) as agentName,
first(queryoutbounddate) as queryoutbounddate,
first(queryinbounddate) as queryinbounddate,
first(price) as price,
first(outdeparture) as outdeparture,
first(outarrival) as outarrival,
first(indeparture) as indeparture,
first(inarrival) as inarrival,
first(querydestinationplace) as querydestinationplace,
first(numberoutstops) as numberoutstops,
CASE WHEN array_contains(collect_set(out_is_holiday), true)
THEN 1
ELSE 0
END out_is_holiday,
CASE WHEN array_contains(collect_set(out_is_longweekends), true)
THEN 1
ELSE 0
END out_is_longweekends,
CASE WHEN array_contains(collect_set(in_is_holiday), true)
THEN 1
ELSE 0
END in_is_holiday,
CASE WHEN array_contains(collect_set(in_is_longweekends), true)
THEN 1
ELSE 0
END in_is_longweekends
FROM f
INNER JOIN agents a
ON f.agent = a.id
INNER JOIN airports p
ON f.querydestinationplace = p.airportId
GROUP BY
querydatetime,
key
""")