Pyspark 中的条件映射

问题描述 投票:0回答:1

我有一个包含 2M 行的 PySpark DataFrame,称为 inventory,其中包含以下列:

类别_id 子类别_id 产品代码 产品名称
1001 A001 X123 小工具A
1001 A002 X456 小工具 B
2002 B003 Y123 小工具C
3003 C000 Z123 小工具D
3003 C002 Z456 小工具E
3003 C003 Z789 小工具 F

我想根据category_id中的条件字典映射sub_category_id

  1. 如果

    category_id
    1001
    ,地图:

    • sub_category_id
      A001
      M001
    • sub_category_id
      A002
      M002
    • 如果 sub_category_id 不在映射中,则不执行任何操作。
  2. 如果

    category_id
    2002
    ,地图:

    • sub_category_id
      B003
      N003
      ...

这是映射示例:


mappings = [
    {
        "conditions": [{"column": "category_id", "values": ["1001"]}],
        "values_mapping": {
            "A001": "M001",
            "A002": "M002"
        }
    },
    {
        "conditions": [{"column": "category_id", "values": ["2002"]}],
        "values_mapping": {
            "B003": "N003"
        }
    },
    {
        "conditions": [{"column": "category_id", "values": ["3003"]}],
        "values_mapping": {
            "C001": "P001",
            "C002": "P002",
            "C003": "P003"
        }
    }
]

我想在 PySpark 中系统地实现这一点,使用配置字典来定义条件和映射。

我尝试使用 for 循环来一一过滤每个条件,应用映射,然后合并每个过滤结果。然而,表现却很差。

如何在 PySpark 中有效地实现这一目标?

apache-spark pyspark apache-spark-sql
1个回答
0
投票

这应该能够解决您的问题假设您的映射已提供,稍后您可以删除或保留并重命名映射的子类别列

df = inputDF.withColumn("mapped_sub_category_id", F.col("sub_category_id"))

for rule in mappings:
    conditions = rule["conditions"]
    values_mapping = rule["values_mapping"]

    category_condition = F.col(conditions[0]["column"]).isin(conditions[0]["values"])

    for original_value, mapped_value in values_mapping.items():
        df = df.withColumn("mapped_sub_category_id",F.when(category_condition & (F.col("sub_category_id") == original_value), mapped_value).otherwise(F.col("mapped_sub_category_id")))
© www.soinside.com 2019 - 2024. All rights reserved.