我想使用 JDBC 从 Postgresql 读取数据并将其存储在 pyspark dataframe 中。当我想使用 df.show()、df.take() 等方法预览数据框中的数据时,它们返回一个错误,指出原因为:java.lang.ClassNotFoundException: org.postgresql.Driver。但是 df.printschema() 会完美地返回数据库表的信息。 这是我的代码:
from pyspark.sql import SparkSession
spark = (
SparkSession.builder.master("spark://spark-master:7077")
.appName("read-postgres-jdbc")
.config("spark.driver.extraClassPath", "/opt/workspace/postgresql-42.2.18.jar")
.config("spark.executor.memory", "1g")
.getOrCreate()
)
sc = spark.sparkContext
df = (
spark.read.format("jdbc")
.option("driver", "org.postgresql.Driver")
.option("url", "jdbc:postgresql://postgres/postgres")
.option("table", 'public."ASSET_DATA"')
.option("dbtable", _select_sql)
.option("user", "airflow")
.option("password", "airflow")
.load()
)
df.show(1)
错误日志:
Py4JJavaError: An error occurred while calling o44.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 172.21.0.6, executor 1): java.lang.ClassNotFoundException: org.postgresql.Driver
Caused by: java.lang.ClassNotFoundException: org.postgresql.Driver
2021 年 7 月 24 日编辑 该脚本是在 JupyterLab 上与独立 Spark 集群分开的 docker 容器中执行的。
您没有使用正确的选项。 当阅读doc时,你会看到这个:
附加到驱动程序类路径之前的额外类路径条目。 注意:在客户端模式下,不能直接在应用程序中通过 SparkConf 设置此配置,因为此时驱动程序 JVM 已经启动。相反,请通过 --driver-class-path 命令行选项或在默认属性文件中设置。
此选项适用于驾驶员。这就是模式获取起作用的原因,它是在驱动程序端完成的操作。但是当您运行 Spark 命令时,该命令由工作人员(或执行人员)执行。他们还需要有
.jar
才能访问 postgres。
如果您的 postgres 驱动程序(“/opt/workspace/postgresql-42.2.18.jar”)不需要任何依赖项,那么您可以使用
spark.jars
将其添加到工作线程中 - 我知道 mysql 不需要依赖项,但是我从来没有尝试过 postgres。如果需要依赖项,那么最好使用 spark.jars.packages
选项直接从 maven 调用包。 (请参阅文档的链接以获取帮助)
您还可以尝试添加:
.config("spark.executor.extraClassPath", "/opt/workspace/postgresql-42.2.18.jar"
这样 jar 也包含在你的执行者中。
场景 - 将 Jupyterlab 与本地/服务器主机(已安装 postgreSQL)连接并在 postgreSQL 数据库中写入 JSON
df.write.format("jdbc").mode("append") \
.option("driver","org.postgresql.Driver") \
.option("url","jdbc:postgresql://localhost:5432/postgres") \
.option("dbtable","TABLENAME") \
.option("user","postgres") \
.option("password","PASSWORD") \
.save()
它对我有用