在下图中,我们使用三种不同配置运行相同的粘合作业,以了解如何写入 S3:
代码差异是这样的:
if dynamic:
df_final_dyn = DynamicFrame.fromDF(df_final, glueContext, "df_final")
glueContext.write_dynamic_frame.from_options(
frame=df_final_dyn, connection_type="s3", format="glueparquet", transformation_ctx="DataSink0",
connection_options={"path": "s3://...",
"partitionKeys": ["year", "month", "day"]})
else:
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
df_final.write.mode("overwrite").format("parquet").partitionBy("year", "month", "day")\
.save("s3://.../")
为什么效率这么低?
我参加聚会迟到了,但这里是根据我的经验提供的一些意见。
在 Glue 中从 S3 读取时可以进行一些优化。 这是来自 AWS 的一篇关于该主题的精彩文章:https://aws.amazon.com/blogs/big-data/optimize-memory-management-in-aws-glue/
这些是我们有幸使用动态框架实现稳定且高性能的 Glue 作业:
'useS3ListImplementation': True
'groupFiles': 'inPartition'
'groupSize': '<some-int>' // some value between 1048576 and 134217728
为了防止 Glue 中的内存故障并增加并行化,
groupSize
是一个关键配置。如果没有它,Glue 会以某种方式动态计算组大小,根据我的经验,当任务获取太多数据时,这可能会导致随机失败。
另一件事是让 Glue 和 Spark 测试更加等效:例如跳过“transformation_ctx”,因为它可能会导致一些开销。