我正在尝试使用以下命令通过
RDS Postgres
PySpark
和 3.3
版本从 AWS Glue 5.0
读取数据。
df = (
self.config.spark_details.spark.read.format("jdbc")
.option(
"url",
f"jdbc:postgresql://{self.postgres_host}:{self.postgres_port}/{self.postgres_database}",
)
.option("driver", "org.postgresql.Driver")
.option("user", self.postgres_username)
.option("password", self.postgres_password)
.option("query", query)
.load()
)
现在,我想将这些数据写入
S3
。为此,我尝试了下面的代码片段:
final_df.write.partitionBy("year","month","day").mode("append").parquet(s3_path)
但这给了我一个
executor heartbeat error
。后来调试后才知道这可能是因为数据没有重新分区,所以在写入数据的时候加上了repartition(10000)
。这似乎有效,但工作 5 小时后仍未完成。没有错误,所以我不得不停止工作。
进一步调试后,我发现当从DB加载数据时,它被加载到单个分区中。所以,无论我添加多少个执行者,它们都是没有用的。
没有任何转变。我只是应该以分区的方式读取和写入数据。数据大小将小于
100 GB
。我使用的是 Glue 5.0 版本,带有 3 workers(12 DPUS)
类型。相同的代码适用于 G4.X worker
记录,但会导致
38 M
记录出现问题。我还查看了驱动程序和执行程序日志,我看到如下所示的内容:.
还有这个
。
我不明白。由于只有 1 个分区加载数据,为什么我可以在日志中看到多个分区正在处理?
我在这里缺少什么?任何提示将不胜感激。即使现在工作已经运行了 2 小时但还没有完成。
TIA
您需要在阅读器中设置 JDBC 选项:
59 M
现在 Spark 知道如何将查询拆分到执行器中。如果您没有要分区的列,您可以使用
.option("partitionColumn", "id") -> the column spark is going to divide()<br>
.option("lowerBound", "1") -> the lowest possible value<br>
.option("upperBound", "1000") -> the highest possible value<br>
.option("numPartitions", "10") -> number of partitions you want
创建一个列,例如在自定义查询中。