Pyspark 以分布式方式循环遍历值

问题描述 投票:0回答:1

我有以下要求,需要使用 A

zure databricks
(
pyspark
) 进行开发。

  1. 调用
    Azure SQL
    数据库从表中读取列。
  2. 对于每个列值,请阅读上面的内容,调用
    REST API
    ,它会返回
    JSON
    响应。
  3. JSON
    转换为
    parquet
    并存储在
    ADLS2
    位置。

目前我尝试过的方法

  df = spark.read.format("jdbc")...
  idList = df.select("id").rdd.flatMap(lambda x: x).collect()
 
  finallist = []

  for id in idList :
     resp = requests.get(...)
     finallist.append(resp.text)

  df1 = spark.sparkContext.parallelize(finallist)   
  df2 = spark.read.json(df1)
  df2.write.mode("overwrite").parquet("abfs://....adls2 path...")

上述方法经过少量样本数据的测试,似乎运行良好。然而,当涉及到可扩展性时,我对上述方法有疑问。

从表中收集 id 后,我使用 for 循环遍历它们,将该 id 传递给我将要进行的其余 API 调用,然后将 API 响应附加到列表中。 这将在驱动程序上运行,因此不会以分布式方式运行。我将 API 响应添加到列表中,以便我可以将 API 响应并行化为分布式集合,稍后当我转换

 时可以在多个工作线程上运行JSON
Parquet

如何以分布式方式运行所有代码?处理我的用例的更好、更有效的方法是什么?也许我可以使用

foreach
来调用 API,但是如何以分布式方式将
JSON
响应转换为
Parquet

请注意,尽管

collect()
只会给驱动程序带来一个列值,但这对我来说不是瓶颈,因为驱动程序有足够的内存。

json pyspark azure-databricks parquet
1个回答
0
投票

我同意@samkart的评论。为了实现您的要求,您可以使用 UDF 将每个 API 响应作为 JSON 字符串存储在同一数据帧列中,然后使用

from_json()
从 JSON 字符串列中提取所需的值。

这是一个示例演示,其中我使用了数据框

df
和列
sql_id

+------+
|sql_id|
+------+
|     1|
|     2|
|     3|
|     4|
|     5|
+------+

例如,我使用了

JSON placeholder
中的 https://jsonplaceholder.typicode.com/todos/{<sql_id>} API,将
sql_id
传递给 UDF,UDF 将以字符串形式返回 JSON 响应。您需要在此使用您的 API。

import requests
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.functions import from_json,col

# Create UDF to get the JSON from API
def json_api(sql_id):
    try:
        response = requests.get(f"https://jsonplaceholder.typicode.com/todos/{sql_id}")
        if response.status_code == 200:
            return response.text
        else:
            return None
    except Exception as e:
        return None

# Set the return type as string
upi_udf = udf(json_api, StringType())

# creates new column named response with api json as json string
df_api_data = df.withColumn("response", upi_udf(df["sql_id"]))

df_api_data.display()

# Extract the JSON schema from the JSON string column
json_schema = spark.read.json(df_api_data.rdd.map(lambda row: row.response)).schema

# Use from_json() and give the schema
res_df = df_api_data.withColumn('new_json', from_json(col('response'), json_schema)).select('sql_id','new_json.*')

res_df.display()

将 JSON 字符串保存为新列后,使用

rdd
lambda
函数提取 JSON 模式,或者您可以构建 JSON 模式,因为您已经知道 JSON 结构。

现在,使用

new_json
创建 JSON 对象列
from_json()
并将此
json_schema
传递给它。然后,在此列上使用
select
从数据框中获取所需的列。

结果示例:

sql_id 已完成 id 标题 用户ID
1 1 delectus aut autem 1
2 2 quis ut nam facilis et officia qui 1
3 3 fugiat veniam minus 1
4 真实 4 et porro tempora 1
5 5 laboriosam mollitia et enim quasi adipisci quia Provident illum 1
© www.soinside.com 2019 - 2024. All rights reserved.