如何使用 AWS Glue 和 Spark 创建包含包含空值的对象数组的 JSON?

问题描述 投票:0回答:1

我正在使用 AWS Glue 和 Apache Spark 开发数据转换管道。我的目标是创建一个包含对象数组的 JSON 输出,其中包含所有字段,即使它们包含

null
值。

目前,输出格式如下:

{
    "identifier": "1015604061WS",
    "values": { ... }
}
{
    "identifier": "1019502238WS",
    "values": { ... }
}

但是我需要结果显式包含数组内的所有对象,如下所示:

[
    {
        "identifier": "1015604061WS",
        "values": { ... }
    },
    {
        "identifier": "1019502238WS",
        "values": { ... }
    }
]

这是我目前的方法:

spark = glueContext.spark_session
spark = SparkSession.builder \
    .appName("GlueJob") \
    .config("spark.sql.jsonGenerator.ignoreNullFields", "false") \
    .getOrCreate()
job = Job(glueContext)

def map_unit(unit_column):
    return when(col(unit_column) == 'M', 'METER') \
           .when(col(unit_column) == 'KG', 'KILOGRAM') \
           .when(col(unit_column) == 'M3', 'CUBIC_METER') \
           .when(col(unit_column) == 'MM', 'MILLIMETER') \
           .otherwise(col(unit_column))

def create_structure(column_name, unit_column):
    return when(
        (col(column_name) != 0) & (col(unit_column).isNotNull()) & (col(unit_column) != ""),
        array(struct(
            lit(None).alias("locale"),
            lit(None).alias("scope"),
            struct(
                col(column_name).cast("float").alias("amount"), 
                map_unit(unit_column).alias("unit")
            ).alias("data")
        ))
    ).otherwise(None)

def create_currency_structure(column_name, currency_column):
    return when(
        (col(column_name) != 0) & (col(currency_column).isNotNull()) & (col(currency_column) != ""),
        array(struct(
            lit(None).alias("locale"),
            lit(None).alias("scope"),
            array(struct(col(column_name).cast("float").alias("amount"), col(currency_column).alias("currency"))).alias("data")
        ))
    ).otherwise(None)

database_name = "project_example"
table_name = "input"
df = glueContext.create_dynamic_frame.from_catalog(
    database=database_name, 
    table_name=table_name,
    transformation_ctx="catalog_node"
).toDF()

df = df.filter(col('material').isNotNull()).withColumnRenamed('material', 'identifier')

df = df.withColumn('erp_name', array(struct(
    when(lit("de_DE") == "", None).otherwise(lit("de_DE")).alias("locale"),
    lit(None).alias("scope"),
    col('materialkurztext').alias("data")
)))

df = df.withColumn('ean13', array(struct(
    lit(None).alias("locale"),
    lit(None).alias("scope"),
    regexp_replace(col('ean13'), "^'", "").cast('string').alias("data")
)))

df = df.withColumn('internal_note', array(struct(
    lit("de_DE").alias("locale"),
    lit(None).alias("scope"),
    col('`int.Vermerk`').alias("data")  # Usar backticks para nombres con caracteres especiales
)))

df = df.withColumn('expiry_date', array(struct(
    lit(None).alias("locale"),
    lit(None).alias("scope"),
    col('auslaufdatum').cast("string").alias("data")
)))

df = df.withColumn('country_iso_code', array(struct(
    lit(None).alias("locale"),
    lit(None).alias("scope"),
    col('ursprungsland').alias("data")
)))

df = df.withColumn('material_type', array(struct(
    lit(None).alias("locale"),
    lit(None).alias("scope"),
    col('materialart').alias("data")
)))

df = df.withColumn('packaging_quantity', array(struct(
    lit(None).alias("locale"),
    lit(None).alias("scope"),
    col('verpackungsmenge').alias("data")
)))

df = df.withColumn('pruftext', array(struct(
    lit("de_DE").alias("locale"),
    lit(None).alias("scope"),
    col('prüftext').alias("data")
)))

df = df.withColumn('ean128', array(struct(
    lit("de_DE").alias("locale"),
    lit(None).alias("scope"),
    regexp_replace(col('ean128'), "^'", "").cast('string').alias("data")
)))

df = df.withColumn('lhm_menge', array(struct(
    lit(None).alias("locale"),
    lit(None).alias("scope"),
    regexp_replace(col('`pal.-Mng.`'), "^'", "").cast('string').alias("data")
)))

df = df.withColumn('rotho_colors', array(struct(
    lit(None).alias("locale"),
    lit(None).alias("scope"),
    array(regexp_replace(col('Farbcode'), "^'", "").cast('string')).alias("data")  # Eliminar ' al inicio
)))

df = df.withColumn('inside_length', create_structure('Innenlänge', 'Innenmaßeinheit')) \
       .withColumn('inside_width', create_structure('Innenbreite', 'Innenmaßeinheit')) \
       .withColumn('inside_height', create_structure('Innenhöhe', 'Innenmaßeinheit')) \
       .withColumn('diagonal', create_structure('Diagonale', 'Innenmaßeinheit'))

df = df.withColumn('gross_weigth', create_structure('bruttogewicht st', 'gewichtseinheit st')) \
       .withColumn('nett_weigth', create_structure('nettogewicht st', 'gewichtseinheit st'))

df = df.withColumn('uvp', create_currency_structure('unverbindliche preisempfehlung', 'uvp w'))

df = df.withColumn('volume', create_structure('volumen st', 'volumeneinheit st'))

df = df.withColumn('length', create_structure('länge st', 'abmessungseinheit st')) \
       .withColumn('width', create_structure('breite st', 'abmessungseinheit st')) \
       .withColumn('height', create_structure('höhe st', 'abmessungseinheit st'))

df = df.withColumn('length_vpe', create_structure('länge vpe', 'abmessungseinheit vpe')) \
       .withColumn('width_vpe', create_structure('breite vpe', 'abmessungseinheit vpe')) \
       .withColumn('heigth_vpe', create_structure('höhe vpe', 'abmessungseinheit vpe'))

df = df.withColumn('gross_weigth_vpe', create_structure('bruttogewicht vpe', 'gewichtseinheit vpe')) \
       .withColumn('nett_weigth_vpe', create_structure('vpe nettogewicht', 'gewichtseinheit vpe'))

df = df.withColumn("values", struct(
    col('erp_name').alias("erp_name"),
    when(col('inside_length').isNotNull(), col('inside_length')).alias("inside_length"),
    when(col('inside_width').isNotNull(), col('inside_width')).alias("inside_width"),
    when(col('inside_height').isNotNull(), col('inside_height')).alias("inside_height"),
    when(col('diagonal').isNotNull(), col('diagonal')).alias("diagonal"),
    when(col('gross_weigth').isNotNull(), col('gross_weigth')).alias("gross_weigth"),
    when(col('nett_weigth').isNotNull(), col('nett_weigth')).alias("nett_weigth"),
    col('ean13').alias("ean13"),
    when(col('uvp').isNotNull(), col('uvp')).alias("uvp"),
    col('internal_note').alias("internal_note"),
    col('expiry_date').alias("expiry_date"),
    when(col('volume').isNotNull(), col('volume')).alias("volume"),
    col('country_iso_code').alias("country_iso_code"),
    col('material_type').alias("material_type"),
    when(col('length').isNotNull(), col('length')).alias("length"),
    when(col('width').isNotNull(), col('width')).alias("width"),
    when(col('height').isNotNull(), col('height')).alias("height"),
    col('packaging_quantity').alias("packaging_quantity"),
    col('pruftext').alias("pruftext"),
    col('ean128').alias("ean128"),
    when(col('length_vpe').isNotNull(), col('length_vpe')).alias("length_vpe"),
    when(col('width_vpe').isNotNull(), col('width_vpe')).alias("width_vpe"),
    when(col('heigth_vpe').isNotNull(), col('heigth_vpe')).alias("heigth_vpe"),
    when(col('gross_weigth_vpe').isNotNull(), col('gross_weigth_vpe')).alias("gross_weigth_vpe"),
    when(col('nett_weigth_vpe').isNotNull(), col('nett_weigth_vpe')).alias("nett_weigth_vpe"),
    # col('lhm_menge').alias("lhm_menge"),
    col('rotho_colors').alias("rotho_colors")
))

df_transformed = df.select('identifier', 'values')

output_path = "s3://project_example/export"

df_transformed.write.json(output_path, mode='overwrite', ignoreNullFields=False)

job.commit() 

如何确保 JSON 输出包含数组格式的所有对象,同时保留

null
值?

环境:

  • AWS Glue 版本:4.0
  • Python 3

任何指导或示例将不胜感激!

apache-spark pyspark etl aws-glue
1个回答
0
投票

我在尝试从 AWS Glue 中的 DataFrame 创建 JSON 文件时遇到了该解决方案,其中我需要过滤掉为空的特定字段。这是通过从每行的值字段中动态删除具有空值的字段来解决问题的代码。

解决方案如下:

定义要检查的字段:我列出了所有应从最终 JSON 中排除的字段(如果它们具有空值)。 过滤 null 字段的函数:filter_null_fields 函数将每一行转换为字典,并根据预定义的字段列表 (fields_to_check) 删除为 null 的字段。 数据转换和 JSON 导出:然后将转换后的数据转换为 JSON 字符串并使用 boto3 上传到 S3。

import json
import boto3
from pyspark.sql import functions as F

# Define the fields that should be removed if they are null
fields_to_check = [
    'inside_length', 'inside_width', 'inside_height', 'diagonal',
    'gross_weigth', 'nett_weigth', 'uvp', 'expiry_date',
    'volume', 'length', 'width', 'height',
    'length_vpe', 'width_vpe', 'heigth_vpe',
    'gross_weigth_vpe', 'nett_weigth_vpe'
]

def filter_null_fields(row):
    """ 
    Function to remove fields from 'values' that are null 
    for the fields defined in 'fields_to_check'
    """
    row_dict = row.asDict(recursive=True)  # Convert the row to a dictionary
    values = row_dict.get('values', {})
    
    # Remove fields that are null
    filtered_values = {k: v for k, v in values.items() if k not in fields_to_check or v is not None}
    row_dict['values'] = filtered_values
    
    return row_dict

# Select the necessary fields from the DataFrame
df_transformed = df.select('identifier', 'values')

# Convert the DataFrame into a list of dictionaries, filtering out null fields
data_list = [filter_null_fields(row) for row in df_transformed.collect()]

# Convert the list of dictionaries into a JSON string with an array format
json_array = json.dumps(data_list, indent=2)

# Define the output bucket and path in S3
output_bucket = "project_example"
output_path = "export/output.json"

# Use boto3 to write the JSON file to S3
s3 = boto3.client('s3')

# Upload the JSON file to S3
s3.put_object(
    Bucket=output_bucket,
    Key=output_path,
    Body=json_array,
    ContentType='application/json'
)

# Commit the job
job.commit()

使用这种方法,列表 (fields_to_check) 中任何为 null 的字段都将从最终 JSON 的值部分中完全排除。然后 JSON 文件以所需的格式成功上传到 S3。

这解决了我的问题,希望对你也有帮助!

© www.soinside.com 2019 - 2024. All rights reserved.