AWS Glue Pyspark 作业尚未结束

问题描述 投票:0回答:1

我正在尝试使用以下命令通过

RDS Postgres
PySpark
3.3
版本从
AWS Glue 5.0
读取数据。

df = (
                self.config.spark_details.spark.read.format("jdbc")
                .option(
                    "url",
                    f"jdbc:postgresql://{self.postgres_host}:{self.postgres_port}/{self.postgres_database}",
                )
                .option("driver", "org.postgresql.Driver")
                .option("user", self.postgres_username)
                .option("password", self.postgres_password)
                .option("query", query)
                .load()
            )

现在,我想将这些数据写入

S3
。为此,我尝试了下面的代码片段:

final_df.write.partitionBy("year","month","day").mode("append").parquet(s3_path)

但这给了我一个

executor heartbeat error
。后来调试后才知道这可能是因为数据没有重新分区,所以在写入数据的时候加上了
repartition(10000)
。这似乎有效,但工作 5 小时后仍未完成。没有错误,所以我不得不停止工作。

进一步调试后,我发现当从DB加载数据时,它被加载到单个分区中。所以,无论我添加多少个执行者,它们都是没有用的。 enter image description here

没有任何转变。我只是应该以分区的方式读取和写入数据。数据大小将小于

100 GB
。我使用的是 Glue 5.0 版本,带有
3 workers(12 DPUS)
类型。
相同的代码适用于 

G4.X worker

记录,但会导致

38 M
记录出现问题。我还查看了驱动程序和执行程序日志,我看到如下所示的内容:

.enter image description here 还有这个
enter image description here 我不明白。由于只有 1 个分区加载数据,为什么我可以在日志中看到多个分区正在处理?

我在这里缺少什么?任何提示将不胜感激。即使现在工作已经运行了 2 小时但还没有完成。

TIA


amazon-web-services apache-spark pyspark
1个回答
0
投票

您需要在阅读器中设置 JDBC 选项:

59 M

现在 Spark 知道如何将查询拆分到执行器中。如果您没有要分区的列,您可以使用 
.option("partitionColumn", "id") -> the column spark is going to divide()<br> .option("lowerBound", "1") -> the lowest possible value<br> .option("upperBound", "1000") -> the highest possible value<br> .option("numPartitions", "10") -> number of partitions you want

创建一个列,例如在自定义查询中。

    

© www.soinside.com 2019 - 2024. All rights reserved.