我有以下要求,需要使用 A
zure databricks
(pyspark
) 进行开发。
Azure SQL
数据库从表中读取列。REST API
,它会返回 JSON
响应。JSON
转换为 parquet
并存储在 ADLS2
位置。目前我尝试过的方法
df = spark.read.format("jdbc")...
idList = df.select("id").rdd.flatMap(lambda x: x).collect()
finallist = []
for id in idList :
resp = requests.get(...)
finallist.append(resp.text)
df1 = spark.sparkContext.parallelize(finallist)
df2 = spark.read.json(df1)
df2.write.mode("overwrite").parquet("abfs://....adls2 path...")
上述方法经过少量样本数据的测试,似乎运行良好。然而,当涉及到可扩展性时,我对上述方法有疑问。
从表中收集 id 后,我使用 for 循环遍历它们,将该 id 传递给我将要进行的其余 API 调用,然后将 API 响应附加到列表中。 这将在驱动程序上运行,因此不会以分布式方式运行。我将 API 响应添加到列表中,以便我可以将 API 响应并行化为分布式集合,稍后当我转换
时可以在多个工作线程上运行JSON
至 Parquet
。
如何以分布式方式运行所有代码?处理我的用例的更好、更有效的方法是什么?也许我可以使用
foreach
来调用 API,但是如何以分布式方式将 JSON
响应转换为 Parquet
。
请注意,尽管
collect()
只会给驱动程序带来一个列值,但这对我来说不是瓶颈,因为驱动程序有足够的内存。
我同意@samkart的评论。为了实现您的要求,您可以使用 UDF 将每个 API 响应作为 JSON 字符串存储在同一数据帧列中,然后使用
from_json()
从 JSON 字符串列中提取所需的值。
这是一个示例演示,其中我使用了数据框
df
和列 sql_id
。
+------+
|sql_id|
+------+
| 1|
| 2|
| 3|
| 4|
| 5|
+------+
例如,我使用了
JSON placeholder中的
https://jsonplaceholder.typicode.com/todos/{<sql_id>}
API,将 sql_id
传递给 UDF,UDF 将以字符串形式返回 JSON 响应。您需要在此使用您的 API。
import requests
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.functions import from_json,col
# Create UDF to get the JSON from API
def json_api(sql_id):
try:
response = requests.get(f"https://jsonplaceholder.typicode.com/todos/{sql_id}")
if response.status_code == 200:
return response.text
else:
return None
except Exception as e:
return None
# Set the return type as string
upi_udf = udf(json_api, StringType())
# creates new column named response with api json as json string
df_api_data = df.withColumn("response", upi_udf(df["sql_id"]))
df_api_data.display()
# Extract the JSON schema from the JSON string column
json_schema = spark.read.json(df_api_data.rdd.map(lambda row: row.response)).schema
# Use from_json() and give the schema
res_df = df_api_data.withColumn('new_json', from_json(col('response'), json_schema)).select('sql_id','new_json.*')
res_df.display()
将 JSON 字符串保存为新列后,使用
rdd
和 lambda
函数提取 JSON 模式,或者您可以构建 JSON 模式,因为您已经知道 JSON 结构。
现在,使用
new_json
创建 JSON 对象列 from_json()
并将此 json_schema
传递给它。然后,在此列上使用 select
从数据框中获取所需的列。
结果示例:
sql_id | 已完成 | id | 标题 | 用户ID |
---|---|---|---|---|
1 | 假 | 1 | delectus aut autem | 1 |
2 | 假 | 2 | quis ut nam facilis et officia qui | 1 |
3 | 假 | 3 | fugiat veniam minus | 1 |
4 | 真实 | 4 | et porro tempora | 1 |
5 | 假 | 5 | laboriosam mollitia et enim quasi adipisci quia Provident illum | 1 |