我正在 Databricks 中运行 Apache Spark,从 MariaDB 检索数据。我有包含设备 UUID 的 IoT 文件,我需要检索有关用户的其他信息来进行分析。但是,如果我“正常”针对数据库运行 SQL 查询,我会得到我期望的数据,但是当我从 Spark 运行它时,我会得到完全不同的数据。
query = """SELECT
d.uuid,
ch.gender AS sex,
ch.cpp,
ch.young_carer,
ch.spp,
ch.asylum_refugee
FROM
devices d
LEFT JOIN device_sessions ds ON ds.device_id = d.id
LEFT JOIN children AS ch ON ds.person_id = ch.id"""
devices = spark.read.format("jdbc").option("url", jdbcUrl).option("query", query).option("user", connectionProperties["user"]).option("useSSL", connectionProperties["useSSL"]).option("driver", connectionProperties["driver"]).option("serverSslCert", connectionProperties["serverSslCert"]).option("trustServerCertificate", connectionProperties["trustServerCertificate"]).option("password", jdbcPassword).load()
devices.show(10)
当我运行它时,它是上面的 Spark,我得到
+----+---+----+-----------+----+--------------+
|uuid|sex| cpp|young_carer| spp|asylum_refugee|
+----+---+----+-----------+----+--------------+
|uuid|sex|true| true|true| true|
|uuid|sex|true| true|true| true|
|uuid|sex|true| true|true| true|
|uuid|sex|true| true|true| true|
|uuid|sex|true| true|true| true|
|uuid|sex|true| true|true| true|
|uuid|sex|true| true|true| true|
|uuid|sex|true| true|true| true|
|uuid|sex|true| true|true| true|
|uuid|sex|true| true|true| true|
+----+---+----+-----------+----+--------------+
only showing top 10 rows
如果我在 Spark 之外使用此查询数据库,
SELECT
d.uuid,
ch.gender AS sex,
ch.cpp,
ch.young_carer,
ch.spp,
ch.asylum_refugee
FROM
devices d
LEFT JOIN device_sessions ds ON ds.device_id = d.id
LEFT JOIN children AS ch ON ds.person_id = ch.id
|uuid|sex| cpp|young_carer| spp|asylum_refugee|
|0002deff-64ef-47b8-a538-14ac4b824e2f |F |0| 0| 0| 0|
|000d63b1-d4e3-454e-865e-08fb24a14d0e |M |0| 0| 0| 0|
我期待 Spark 中得到同样的结果。这两行中存在所有值。也许问题是除 UUID 之外的某些行有空值?或者我需要指定类型的架构?
'children.gender' 是 varchar(255)。其他“子”列是tinyint(1) = 0。 “devices.uuid”是 varchar(255)。 连接 ID 均为 bigint(20)。 (但是,我无法检索任何 bigint(20) 值,因为我收到“long is out of range”错误,因此我在查询中避免了这种情况。
这对我来说毫无意义。为什么在不同的上下文中运行时结果如此不同。如果有人有线索,我将非常感激。谢谢。
进一步简化,我只取两列,以防空值混淆其他列。
query = """SELECT
d.`uuid`,
ch.`gender`
FROM
devices d
LEFT JOIN device_sessions ds ON ds.`device_id` = d.`id`
LEFT JOIN children AS ch ON ds.`person_id` = ch.`id`"""
我明白了
+----+------+
|uuid|gender|
+----+------+
|uuid|gender|
|uuid|gender|
|uuid|gender|
|uuid|gender|
|uuid|gender|
+----+------+
我发现有人使用 pyspark 和 Hive 提出了类似的问题,但也没有答案。
解决了。出于隐私考虑,我没有在问题中显示网址和其他连接详细信息,但实际上这就是问题所在。为此设置的 URL 以“jdbc:mariadb”开头,但 Spark 无法识别它。即使它是 mariadb 驱动程序,它也必须是“jdbc:mysql”。一旦更改,值就会按预期出现。
jdbcHostname = "hostname.mariadb.database.azure.com"
jdbcDatabase = "database_name"
jdbcPort = 3306
jdbcUrl = "jdbc:mysql://{0}:{1}/{2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
jdbcPassword = dbutils.secrets.get(scope = "iot-blob", key = "password")
connectionProperties = {
# verbose ssl properties are all required
"useSSL" : "true",
"trustServerCertificate" : "true",
"serverSslCert" : "/FileStore/azure_mariadb_ca.pem",
#I had to install the mariaDB driver to get a connection to work
"driver" : "org.mariadb.jdbc.Driver",
#Credentials live here with pw passed in through databricks scopes
"user" : "username@host",
"password" : jdbcPassword
}
result = spark.read.format("jdbc").option("url", jdbcUrl) \
.option("query", query) \
.option("user", connectionProperties["user"]) \
.option("useSSL", connectionProperties["useSSL"]) \
.option("driver", connectionProperties["driver"]) \
.option("serverSslCert", connectionProperties["serverSslCert"]) \
.option("trustServerCertificate", connectionProperties["trustServerCertificate"]) \
.option("password", jdbcPassword).load()
我也面临同样的问题。折腾了一整天,终于解决了。主要原因是 Spark 无法识别 JDBC URL 中的“jdbc.mariadb”。要解决此问题,您可以将“?permitMysqlScheme”附加到 JDBC URL 的末尾。这使得 Spark 可以将 MariaDB 与 MySQL 方案兼容。出于安全原因,我没有添加 MariaDB 凭据。
这里是如何使用 PySpark 从 MariaDB 读取表的详细示例,假设已经建立了 Spark 连接。此代码包含从 MariaDB 读取数据所需的配置:
host_name = 'your_mariadb_host_name'
database_name = 'your_mariadb_database_name'
user_name = 'your_mariadb_user_name'
password = 'your_password_for_mariadb_user_name'
port = '3306' # default port for Mysql
sql = "SELECT * FROM your_table" # You can specify any SQL query here
jdbc_url = f"jdbc:mysql://{host_name}:{port}/{database_name}?permitMysqlScheme"
df = spark.read \
.format("jdbc") \
.option("driver", "org.mariadb.jdbc.Driver") \
.option("url", jdbc_url) \
.option("query", sql) \
.option("user", user_name) \
.option("password", password) \
.load()
df.show(truncate=False) # showing the dataframe