将 parquet 列转换为 Json

问题描述 投票:0回答:1

我有来自具有以下架构的 SQL Server 的镶木地板:

root
 |-- user_uid: string (nullable = true)
 |-- user_email: string (nullable = true)
 |-- ud_id: integer (nullable = true)
 |-- ud_standard_workflow_id: integer (nullable = true)
 |-- ud_is_preview: boolean (nullable = true)
 |-- ud_is_completed: boolean (nullable = true)
 |-- ud_language: string (nullable = true)
 |-- ud_created_date: timestamp (nullable = true)
 |-- ud_modified_date: string (nullable = true)
 |-- ud_created_by_id: string (nullable = true)
 |-- dsud_id: integer (nullable = true)
 |-- dsud_user_data_id: integer (nullable = true)
 |-- dsud_dynamic_step_id: integer (nullable = true)
 |-- dsud_is_completed: boolean (nullable = true)
 |-- dsud_answers: string (nullable = true)

最后一列 dsud_answers 是一个字符串,但包含列表形式的 JSON 数据:

[{"QuestionId":6406,"QuestionTitle":"Residency","Value":"1975"},{"QuestionId":6407,"QuestionTitle":"Citizentship","Value":"66664"}]

如何将此列转换为正确的 JSON 数据类型? 我仍然收到错误:数据类型不匹配:输入模式“STRING”必须是结构、数组或映射。

我想要的结果是将列 dsud_answers 作为 JSON 数据类型,这样我就可以展平 if 的内容。在本例中将会有 2 条记录,因为 JSON 包含 2 个 QuestionId。

我设法在 Pandas 中对其进行转换,但无法弄清楚我们的 Pyspark 方法来做到这一点。 我将 Pyspark 数据框转换为 Pandas 数据框,然后循环遍历所有列。

from pyspark.sql.types import StructField, IntegerType, TimestampType, BooleanType

def batch_function (df_answers, batch_id):
    

    df = df_answers.select("*").filter(df_bgx_answers.dsud_answers != '[]')
         .withColumn("ud_modified_date", to_timestamp(df_bgx_answers.ud_modified_date))
         .drop_duplicates()
         .toPandas()


    df_attributes = pd.DataFrame()
    df_final = pd.DataFrame()
    # Loop through the data to fill the dataframe
    for index in df.index:

        indexId = df.dsud_id[index]
        userDataId = df.dsud_user_data_id[index]
        dynamicStepId = df.dsud_dynamic_step_id[index]
        languageID = df.ud_language[index]
        createdDate = to_datetime(df.ud_created_date[index])
        createdBy = df.ud_created_by_id[index]
        modifiedDate = df.ud_modified_date[index]
        email = df.user_email[index]
        workflowId = df.ud_standard_workflow_id[index]
        uid = df.user_uid[index]
        completed = df.wrn_is_completed[index]
        agreed = df.wrn_is_agreed[index]
        flow_name = df.wf_name[index]

        row_json = json.loads(df.dsud_answers[index])
        normalized_row = pd.json_normalize(row_json)
        
        df_attributes = pd.concat([df_attributes, normalized_row], ignore_index=True) 
        df_attributes['dsud_user_data_id'] = userDataId
        df_attributes['dsud_id'] = indexId
        df_attributes['dsud_dynamic_step_id'] = dynamicStepId
        df_attributes['ud_language'] = languageID
        df_attributes['ud_created_date'] = createdDate
        df_attributes['ud_created_by_id'] = createdBy
        df_attributes['ud_modified_date'] = modifiedDate
        df_attributes['user_email'] = email
        df_attributes['ud_standard_workflow_id'] = workflowId
        df_attributes['user_uid'] = uid
        df_attributes['wrn_is_completed'] = completed
        df_attributes['wrn_is_agreed'] = agreed
        df_attributes['wf_name'] = flow_name

        df_attributes = df_attributes.reset_index(drop=True)
        df_final = pd.concat([df_final, df_attributes])

    df_answers = spark.createDataFrame(df_final)

    df_answers.write.mode("append").format("delta").saveAsTable("final_table")  
json pyspark azure-databricks
1个回答
0
投票

在 PySpark 中将 dsud_answers 列从字符串转换为 JSON 数据类型

df.printSchema()
root
 |-- dsud_answers: string (nullable = true)
 |-- dsud_dynamic_step_id: long (nullable = true)
 |-- dsud_id: long (nullable = true)
 |-- dsud_is_completed: boolean (nullable = true)
 |-- dsud_user_data_id: long (nullable = true)
 |-- ud_created_by_id: string (nullable = true)
 |-- ud_created_date: string (nullable = true)
 |-- ud_id: long (nullable = true)
 |-- ud_is_completed: boolean (nullable = true)
 |-- ud_is_preview: boolean (nullable = true)
 |-- ud_language: string (nullable = true)
 |-- ud_modified_date: string (nullable = true)
 |-- ud_standard_workflow_id: long (nullable = true)
 |-- user_email: string (nullable = true)
 |-- user_uid: string (nullable = true)

我已经尝试过以下方法:

from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.types import ArrayType
schema = ArrayType(StructType([
    StructField("QuestionId", IntegerType()),
    StructField("QuestionTitle", StringType()),
    StructField("Value", StringType())
]))
df = df.withColumn("dsud_answers", from_json(col("dsud_answers"), schema))
df.display(truncate=False)

结果:

dsud_answers    dsud_dynamic_step_id    dsud_id dsud_is_completed   dsud_user_data_id   ud_created_by_id    ud_created_date ud_id   ud_is_completed ud_is_preview   ud_language ud_modified_date    ud_standard_workflow_id user_email  user_uid
[{"QuestionId":6406,"QuestionTitle":"Residency","Value":"1975"},{"QuestionId":6407,"QuestionTitle":"Citizenship","Value":"66664"}]  201 100 false   101 user1   2022-01-01 10:00:00 1   false   true    en  2022-01-02 09:00:00 1001    [email protected]   12345
[{"QuestionId":6408,"QuestionTitle":"Education","Value":"Bachelor"},{"QuestionId":6409,"QuestionTitle":"Occupation","Value":"Engineer"}]    202 200 true    201 user2   2022-01-03 14:00:00 2   true    false   es  2022-01-04 10:00:00 1002    [email protected]   67890

enter image description here

在上面的代码中,我应用架构并将 dsud_answers 列转换为

JSON
数据类型。我正在为 JSON 数据定义一个架构。 JSON 应表示一个对象数组,每个对象包含三个字段:QuestionId(整数)、QuestionTitle(字符串)和 Value(字符串)。使用
withColumn
方法,我通过将名为“dsud_answers”的 JSON 列解析为架构定义的结构化格式来转换 DataFrame (df)。结果是一个名为“dsud_answers”的新列,其中包含已解析的 JSON 数据。

© www.soinside.com 2019 - 2024. All rights reserved.