我有一个包含 2M 行的 PySpark DataFrame,称为 inventory,其中包含以下列:
类别_id | 子类别_id | 产品代码 | 产品名称 |
---|---|---|---|
1001 | A001 | X123 | 小工具A |
1001 | A002 | X456 | 小工具 B |
2002 | B003 | Y123 | 小工具C |
3003 | C000 | Z123 | 小工具D |
3003 | C002 | Z456 | 小工具E |
3003 | C003 | Z789 | 小工具 F |
我想根据category_id中的条件字典映射sub_category_id
如果
category_id
是1001
,地图:
sub_category_id
A001
至 M001
sub_category_id
A002
至 M002
如果
category_id
是2002
,地图:
sub_category_id
B003
至 N003
...这是映射示例:
mappings = [
{
"conditions": [{"column": "category_id", "values": ["1001"]}],
"values_mapping": {
"A001": "M001",
"A002": "M002"
}
},
{
"conditions": [{"column": "category_id", "values": ["2002"]}],
"values_mapping": {
"B003": "N003"
}
},
{
"conditions": [{"column": "category_id", "values": ["3003"]}],
"values_mapping": {
"C001": "P001",
"C002": "P002",
"C003": "P003"
}
}
]
我想在 PySpark 中系统地实现这一点,使用配置字典来定义条件和映射。
我尝试使用 for 循环来一一过滤每个条件,应用映射,然后合并每个过滤结果。然而,表现却很差。
如何在 PySpark 中有效地实现这一目标?
这应该能够解决您的问题假设您的映射已提供,稍后您可以删除或保留并重命名映射的子类别列
df = inputDF.withColumn("mapped_sub_category_id", F.col("sub_category_id"))
for rule in mappings:
conditions = rule["conditions"]
values_mapping = rule["values_mapping"]
category_condition = F.col(conditions[0]["column"]).isin(conditions[0]["values"])
for original_value, mapped_value in values_mapping.items():
df = df.withColumn("mapped_sub_category_id",F.when(category_condition & (F.col("sub_category_id") == original_value), mapped_value).otherwise(F.col("mapped_sub_category_id")))