Pyspark:加入for循环

问题描述 投票:0回答:1

我有 2 个数据框

product
categories

products_df:
+------+------+------+-----------+
|region|laptop|mobile|conditioner|
+------+------+------+-----------+
| North|  L123|  M456|       C789|
|  West|  NULL|  M789|       C123|
|  NULL|  L456|  M123|       C456|
+------+------+------+-----------+

categories_df:
+------+--------------------+------------+-----------+
|region|        product_name|product_code|      class|
+------+--------------------+------------+-----------+
| North|      Laptop Model X|        L123|electronics|
| North|      Mobile Model Y|        M456|electronics|
|  NULL|      Mobile Model Z|        M789|electronics|
|  NULL|      Laptop Model Z|        L456|electronics|
|  NULL|      Mobile Model A|        M123|electronics|
|  West|  Conditioner Deluxe|        C123| appliances|
| North|     Conditioner Pro|        C789| appliances|
|  NULL|Conditioner Standard|        C456| appliances|
+------+--------------------+------------+-----------+

product_df
包含列中的产品(笔记本电脑、手机、空调),它们的值是其产品代码。要求是,我们需要将产品代码替换为其名称。

对于每个产品代码,我们需要根据产品类别(电子、电器)和区域从

categories_df
获取其映射的产品名称。

就像

product_df
一样,我们可以有其他产品列不同的数据框,但它们映射到
categories_df
。为此,我创建了下面的 UDF,它采用产品数据帧的动态输入和常量
categories_df
。为了将产品与类别映射,我将字典作为产品列和类别列的类的映射传递:

'''
product_class_mapping = {
    "laptop": "electronics",
    "mobile": "electronics",
    "conditioner": "appliances",
}
'''
def set_product_name(products_df, categories_df, product_class_mapping):
    for product_col, product_class in product_class_mapping.items():
        products_df = (
            products_df.alias("p")
            .join(
                categories_df.alias("c"),
                (F.col("c.class") == product_class)
                & (F.col(f"p.{product_col}") == F.col("c.product_code"))
                & (F.col("c.region").isNull() | (F.col("p.region") == F.col("c.region"))),
                how="left",
            )
            .select(F.col("p.*"), F.col("c.product_name"))
            .withColumn(
                product_col,
                F.when(F.col(product_col).isNotNull(), F.col("product_name")).otherwise(
                    F.col(product_col)
                ),
            ).drop("product_name")
        )
        
    print("Final mapped products_df:")
    products_df.show()
    return products_df

它会生成预期的输出,其中

product_code
被替换为
product_name
:

Final mapped products_df:
+------+--------------+--------------+--------------------+
|region|        laptop|        mobile|         conditioner|
+------+--------------+--------------+--------------------+
| North|Laptop Model X|Mobile Model Y|     Conditioner Pro|
|  West|          NULL|Mobile Model Z|  Conditioner Deluxe|
|  NULL|Laptop Model Z|Mobile Model A|Conditioner Standard|
+------+--------------+--------------+--------------------+

但是,我怀疑这是否是最佳解决方案?当我们使用 for 循环并执行连接时。 我们可以考虑任何替代方案吗?

apache-spark pyspark
1个回答
0
投票

融化产品数据框

df = products_df.melt(
    ids=['region'],
    values=list(product_class_mapping),
    variableColumnName='product_name',
    valueColumnName='product_code'
)

# df.show()
# +------+------------+------------+
# |region|product_name|product_code|
# +------+------------+------------+
# | North|      laptop|        L123|
# | North|      mobile|        M456|
# | North| conditioner|        C789|
# |  West|      laptop|        null|
# |  West|      mobile|        M789|
# |  West| conditioner|        C123|
# |  null|      laptop|        L456|
# |  null|      mobile|        M123|
# |  null| conditioner|        C456|
# +------+------------+------------+

加入产品映射来拉动班级

mapping = spark.createDataFrame(product_class_mapping.items(), ['product_name', 'class'])
df = df.join(F.broadcast(mapping), on='product_name', how='left')

# df.show()
# +------------+------+------------+-----------+
# |product_name|region|product_code|      class|
# +------------+------+------------+-----------+
# |      laptop| North|        L123|electronics|
# |      mobile| North|        M456|electronics|
# | conditioner| North|        C789| appliances|
# |      laptop|  West|        null|electronics|
# |      mobile|  West|        M789|electronics|
# | conditioner|  West|        C123| appliances|
# |      laptop|  null|        L456|electronics|
# |      mobile|  null|        M123|electronics|
# | conditioner|  null|        C456| appliances|
# +------------+------+------------+-----------+

根据映射逻辑创建连接条件并与类别df连接

join_cond = (
    (
        (df['region'] == category_df['region'])
        | df['region'].isNull() | category_df['region'].isNull()
    )
    & (df['class'] == category_df['class'])
    & (df['product_code'] == category_df['product_code'])
)
df = df.alias('l').join(category_df.alias('r'), on=join_cond)
df = df.select('l.region', 'l.product_name', 'r.product_name')

# df.show()
# +------+------------+--------------------+
# |region|product_name|        product_name|
# +------+------------+--------------------+
# |  West| conditioner|  Conditioner Deluxe|
# |  null| conditioner|Conditioner Standard|
# | North| conditioner|     Conditioner Pro|
# | North|      laptop|      Laptop Model X|
# |  null|      laptop|      Laptop Model Z|
# |  null|      mobile|      Mobile Model A|
# | North|      mobile|      Mobile Model Y|
# |  West|      mobile|      Mobile Model Z|
# +------+------------+--------------------+

使用枢轴重塑数据框

df = df.groupBy('region').pivot('l.product_name').agg(F.first('r.product_name'))

# df.show()
# +------+--------------------+--------------+--------------+
# |region|         conditioner|        laptop|        mobile|
# +------+--------------------+--------------+--------------+
# |  null|Conditioner Standard|Laptop Model Z|Mobile Model A|
# | North|     Conditioner Pro|Laptop Model X|Mobile Model Y|
# |  West|  Conditioner Deluxe|          null|Mobile Model Z|
# +------+--------------------+--------------+--------------+
© www.soinside.com 2019 - 2024. All rights reserved.