我尝试在spark.read.jdbc中使用谓词,它使读取速度明显变慢。
我有一个 postgres 表可以读入 Spark。我尝试了以下两个选项:
选项1——使用upperBound、lowerBound、numPartitions
df = spark.read.format("jdbc")\
.option("url",f"jdbc:postgresql://{host}:{port}/{dbname}")\
.option("dbtable",dbtable)\
.option("user",user)\
.option("password",password)\
.option("partitionColumn","person_id")\
.option("numPartitions",2)\
.option("lowerBound",261461)\
.option("upperBound",29283487)\
.load()
df.count()
选项2——使用谓词
df = spark.read.jdbc(
url = f"jdbc:postgresql://{host}:{port}/{dbname}",
table = dbtable,
predicates = ["(person_id < 14772474 OR person_id is null)", "(person_id >= 14772474)"],
properties = {"user": f"{user}", "password": f"{password}"}
)
df.count()
我希望这两个选项应该有相似的执行时间。 但是,选项 1 需要 10 秒,而选项 2 需要 40 秒。
最简单的是查看 PostgreSQL DB 的查询历史记录,以了解 PySpark 通过 JDBC 执行了哪些查询。
选项 1 并行执行两个查询:
select * from dbtable where person_id between 261461 and 14511013
select * from dbtable where person_id between 14511014 and 29283487
记住
(29283487 - 261461) / 2 = 14511013
对于选项 2 的作用有两种猜测,我的钱花在#1 上:
select * from dbtable where <your predicates>
select * from dbtable
,获取所有结果,然后在 Spark 中应用谓词/过滤器。