`我正在 AWS Glue 上使用 pyspark 来运行 ETL 脚本,如下所示:
```
`select
date as date,
hour as hour,
trip_id as trip_id,
first(user_id) as user_id,
first(vehicle_id) as vehicle_id,
count(ct) as total_driving_time,
max(ct_list) as ct_list,
max(sp_list) as sp_list,
max(lt_list) as lt_list,
max(ln_list) as ln_list
from (
select date, hour, user_id, vehicle_id, trip_id, ct,
collect_list(ct) OVER (PARTITION BY date, hour, trip_id ORDER BY ct) as ct_list,
collect_list(sp) OVER (PARTITION BY date, hour, trip_id ORDER BY ct) as sp_list,
collect_list(latitude) OVER (PARTITION BY date, hour, trip_id ORDER BY ct) as lt_list,
collect_list(longitude) OVER (PARTITION BY date, hour, trip_id ORDER BY ct) as ln_list
from table_temp
) t
group by date, hour, trip_id;`
```
数据在源和目标上进行分区,但单个分区数据量相当多 源 S3 有 26 GB 数据,其中有 711396000 而写入 s3 的目标结果只有 3 行 以下是我正在使用的 Spark 配置:
conf.set("spark.hadoop.parquet.read.allocation.size", "134217728")
conf.set("spark.files.maxPartitionBytes", "536870912")
conf.set("spark.sql.files.maxPartitionBytes", "536870912")
conf.set("spark.hadoop.fs.s3a.fast.upload", "true")
conf.set("spark.hadoop.fs.s3a.fast.upload.buffer", "bytebuffer")
conf.set("spark.hadoop.fs.s3a.endpoint", "s3.<region-name>.amazonaws.com")
conf.set("spark.sql.execution.pythonUDF.arrow.enabled", "True")
df.coalesce(1).write.option("parquet.block.size",512*1024*1024).format("parquet").mode(write_mode).save(f"s3a://{buket_name}/{table_name}")`
窗口操作成本高昂;这种转变可以通过
GROUP BY
来实现
SELECT
date,
hour,
trip_id,
FIRST(user_id) AS user_id,
FIRST(vehicle_id) AS vehicle_id,
COUNT(ct) AS total_driving_time,
COLLECT_LIST(ct) AS ct_list,
COLLECT_LIST(sp) AS sp_list,
COLLECT_LIST(latitude) AS lt_list,
COLLECT_LIST(longitude) AS ln_list
FROM table_temp
GROUP BY date, hour, trip_id