SparkSQL JDBC(PySpark)到Postgres-创建表和使用CTE

问题描述 投票:0回答:1

我正在一个项目中,将Python概念验证(POC)移植到PySpark。 POC大量利用了Postgres,尤其是PostGIS地理空间库。大多数工作包括Python在回发数据进行最终处理之前向Postgres发出命令。

传递给Postgres的某些查询包含CREATE TABLEINSERTCREATE TEMP TABLE和CTE WITH语句。我正在尝试确定是否可以通过JDBC将这些查询从Spark传递给Postgres。

有人可以确认此功能在Spark JDBC中是否可用于其他数据库?明确地说,我想将纯英语SQL查询传递给Postgres,而不使用可用的SparkSQL API(因为它们不支持我需要的所有操作)。我正在使用Spark版本2.3.0PostgreSQL 10.11Python 2.7.5(是的,我知道Python 2的EOL,这是另一个故事)。

这是到目前为止我尝试过的:

使用SparkSession.read

创建与Postgres的Spark会话

postgres = SparkSession.builder \
    .appName("myApp") \
    .config("spark.jars", "/usr/share/java/postgresql-jdbc.jar") \
    .getOrCreate()

定义要传递给dbtable参数的查询

qry = """create table test (name varchar(50), age int)"""

qry传递给Postgres spark会话对象的dbtable参数

postgres.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://....) \
    .option("dbtable", qry) \
    .option("user", configs['user']) \
    .option("password", configs['password']) \
    .option("driver", "org.postgresql.Driver") \
    .option("ssl", "true") \
    .load()

将返回以下语法错误(使用上面列出的其他SQL命令时,会产生相同类型的错误:]

Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 367, in <module>
    raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 360, in <module>
    exec(code, _zcUserQueryNameSpace)
  File "<stdin>", line 9, in <module>
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/readwriter.py", line 172, in load
    return self._df(self._jreader.load())
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
    format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o484.load.
: org.postgresql.util.PSQLException: ERROR: syntax error at or near "create"
  Position: 15

使用SparkSession.sql()

利用上面定义的相同的postgres对象

将查询传递到.sql()

postgres.sql("""create table (name varchar(50), age int)""")

哪个返回以下解析异常:

Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 367, in <module>
    raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 360, in <module>
    exec(code, _zcUserQueryNameSpace)
  File "<stdin>", line 1, in <module>
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 714, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 73, in deco
    raise ParseException(s.split(': ', 1)[1], stackTrace)
ParseException: u"\nno viable alternative at input 'create table ('(line 1, pos 13)\n\n== SQL ==\ncreate table (name varchar(50), age int)\n-------------^^^\n"

并且如果我将查询用引号括起来,如postgres.sql("(create table (name varchar(50), age int))"),那么我将得到另一个解析异常,这使我相信无法实现我想要的功能:

Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 367, in <module>
    raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 360, in <module>
    exec(code, _zcUserQueryNameSpace)
  File "<stdin>", line 1, in <module>
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 714, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 73, in deco
    raise ParseException(s.split(': ', 1)[1], stackTrace)
ParseException: u"\nextraneous input 'create' expecting {'(', 'SELECT', 'FROM', 'VALUES', 'TABLE', 'INSERT', 'MAP', 'REDUCE'}(line 1, pos 1)\n\n== SQL ==\n(create table (name varchar(50), age int))\n-^^^\n"

我的问题归结为:

  1. 我的方法是否缺少某种配置或其他必要步骤?
  2. Postgres是否可以利用spark.sql() API?
  3. 我正在努力实现的目标吗?

我在互联网上进行搜索,以查找使用SparkSQL向PostgreSQL发出这类SQL查询的示例,但未找到任何解决方案。如果有解决方案,我希望看到一个例子,否则,确认这是不可能的,这将绰绰有余。

python postgresql apache-spark jdbc pyspark
1个回答
0
投票

我正在努力实现的目标吗?

我会说不。 Spark是用于数据处理的框架,因此它的API主要是为使用数据源进行readwrite操作而开发的。就您而言,您有一些DDL语句,Spark不应执行此类操作。

例如,第一个示例中的dbtable选项必须是表名或某些SELECT查询。

如果您需要运行某些DDL,DCL,TCL查询,则应以其他方式执行此操作,例如通过psycopg2模块。

可以通过Postgres以某种方式利用spark.sql()API吗?

spark.sql是一种在SparkSession表或视图中注册的对象上执行SparkSQL代码的方法。它可以与任何受支持的数据源一起使用,不仅可以使用jdbc,而且还可以使用SparkSQL语法在Spark端使用。例如

val spark = SparkSession
        ...
        .getOrCreate()

spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql://ip/database_name")
  .option("dbtable", "schema.tablename")
  .load()
  .createOrReplaceTempView("my_spark_table_over_postgresql_table")

// and then you can operate with a view:
val df = spark.sql("select * from my_spark_table_over_postgresql_table where ... ")
© www.soinside.com 2019 - 2024. All rights reserved.