我有 2 个数据框
product
和 categories
products_df:
+------+------+------+-----------+
|region|laptop|mobile|conditioner|
+------+------+------+-----------+
| North| L123| M456| C789|
| West| NULL| M789| C123|
| NULL| L456| M123| C456|
+------+------+------+-----------+
categories_df:
+------+--------------------+------------+-----------+
|region| product_name|product_code| class|
+------+--------------------+------------+-----------+
| North| Laptop Model X| L123|electronics|
| North| Mobile Model Y| M456|electronics|
| NULL| Mobile Model Z| M789|electronics|
| NULL| Laptop Model Z| L456|electronics|
| NULL| Mobile Model A| M123|electronics|
| West| Conditioner Deluxe| C123| appliances|
| North| Conditioner Pro| C789| appliances|
| NULL|Conditioner Standard| C456| appliances|
+------+--------------------+------------+-----------+
product_df
包含列中的产品(笔记本电脑、手机、空调),它们的值是其产品代码。要求是,我们需要将产品代码替换为其名称。
对于每个产品代码,我们需要根据产品类别(电子、电器)和区域从
categories_df
获取其映射的产品名称。
就像
product_df
一样,我们可以有其他产品列不同的数据框,但它们映射到categories_df
。为此,我创建了下面的 UDF,它采用产品数据帧的动态输入和常量 categories_df
。为了将产品与类别映射,我将字典作为产品列和类别列的类的映射传递:
'''
product_class_mapping = {
"laptop": "electronics",
"mobile": "electronics",
"conditioner": "appliances",
}
'''
def set_product_name(products_df, categories_df, product_class_mapping):
for product_col, product_class in product_class_mapping.items():
products_df = (
products_df.alias("p")
.join(
categories_df.alias("c"),
(F.col("c.class") == product_class)
& (F.col(f"p.{product_col}") == F.col("c.product_code"))
& (F.col("c.region").isNull() | (F.col("p.region") == F.col("c.region"))),
how="left",
)
.select(F.col("p.*"), F.col("c.product_name"))
.withColumn(
product_col,
F.when(F.col(product_col).isNotNull(), F.col("product_name")).otherwise(
F.col(product_col)
),
).drop("product_name")
)
print("Final mapped products_df:")
products_df.show()
return products_df
它会生成预期的输出,其中
product_code
被替换为 product_name
:
Final mapped products_df:
+------+--------------+--------------+--------------------+
|region| laptop| mobile| conditioner|
+------+--------------+--------------+--------------------+
| North|Laptop Model X|Mobile Model Y| Conditioner Pro|
| West| NULL|Mobile Model Z| Conditioner Deluxe|
| NULL|Laptop Model Z|Mobile Model A|Conditioner Standard|
+------+--------------+--------------+--------------------+
但是,我怀疑这是否是最佳解决方案?当我们使用 for 循环并执行连接时。 我们可以考虑任何替代方案吗?
融化产品数据框
df = products_df.melt(
ids=['region'],
values=list(product_class_mapping),
variableColumnName='product_name',
valueColumnName='product_code'
)
# df.show()
# +------+------------+------------+
# |region|product_name|product_code|
# +------+------------+------------+
# | North| laptop| L123|
# | North| mobile| M456|
# | North| conditioner| C789|
# | West| laptop| null|
# | West| mobile| M789|
# | West| conditioner| C123|
# | null| laptop| L456|
# | null| mobile| M123|
# | null| conditioner| C456|
# +------+------------+------------+
加入产品映射来拉动班级
mapping = spark.createDataFrame(product_class_mapping.items(), ['product_name', 'class'])
df = df.join(F.broadcast(mapping), on='product_name', how='left')
# df.show()
# +------------+------+------------+-----------+
# |product_name|region|product_code| class|
# +------------+------+------------+-----------+
# | laptop| North| L123|electronics|
# | mobile| North| M456|electronics|
# | conditioner| North| C789| appliances|
# | laptop| West| null|electronics|
# | mobile| West| M789|electronics|
# | conditioner| West| C123| appliances|
# | laptop| null| L456|electronics|
# | mobile| null| M123|electronics|
# | conditioner| null| C456| appliances|
# +------------+------+------------+-----------+
根据映射逻辑创建连接条件并与类别df连接
join_cond = (
(
(df['region'] == category_df['region'])
| df['region'].isNull() | category_df['region'].isNull()
)
& (df['class'] == category_df['class'])
& (df['product_code'] == category_df['product_code'])
)
df = df.alias('l').join(category_df.alias('r'), on=join_cond)
df = df.select('l.region', 'l.product_name', 'r.product_name')
# df.show()
# +------+------------+--------------------+
# |region|product_name| product_name|
# +------+------------+--------------------+
# | West| conditioner| Conditioner Deluxe|
# | null| conditioner|Conditioner Standard|
# | North| conditioner| Conditioner Pro|
# | North| laptop| Laptop Model X|
# | null| laptop| Laptop Model Z|
# | null| mobile| Mobile Model A|
# | North| mobile| Mobile Model Y|
# | West| mobile| Mobile Model Z|
# +------+------------+--------------------+
使用枢轴重塑数据框
df = df.groupBy('region').pivot('l.product_name').agg(F.first('r.product_name'))
# df.show()
# +------+--------------------+--------------+--------------+
# |region| conditioner| laptop| mobile|
# +------+--------------------+--------------+--------------+
# | null|Conditioner Standard|Laptop Model Z|Mobile Model A|
# | North| Conditioner Pro|Laptop Model X|Mobile Model Y|
# | West| Conditioner Deluxe| null|Mobile Model Z|
# +------+--------------------+--------------+--------------+