在 PySpark 中以可扩展的方式链接多个 WHEN 条件

问题描述 投票:0回答:2

我有一本字典(变量

pats
),其中有许多
when
参数:条件和值。

from pyspark.sql import functions as F
df = spark.createDataFrame([("ė",), ("2",), ("",), ("@",)], ["col1"])

pats = {
  r"^\d$"          :"digit",
  r"^\p{L}$"       :"letter",
  r"^[\p{P}\p{S}]$":"spec_char",
  r"^$"            :"empty"
}

whens = (
    F.when(F.col("col1").rlike(list(pats)[0]), pats[list(pats)[0]])
     .when(F.col("col1").rlike(list(pats)[1]), pats[list(pats)[1]])
     .when(F.col("col1").rlike(list(pats)[2]), pats[list(pats)[2]])
     .when(F.col("col1").rlike(list(pats)[3]), pats[list(pats)[3]])
     .otherwise(F.col("col1"))
)
df = df.withColumn("col2", whens)

df.show()
# +----+---------+
# |col1|     col2|
# +----+---------+
# |   ė|   letter|
# |   2|    digit|
# |    |    empty|
# |   @|spec_char|
# +----+---------+

我正在寻找一种可扩展的方法来所有

when
条件,所以我不需要为每个键写一行。

apache-spark dictionary pyspark conditional-statements method-chaining
2个回答
0
投票
  • 没有

    reduce

    whens = F
    for k, v in pats.items():
        whens = whens.when(F.col("col1").rlike(k), v)
    whens = whens.otherwise(F.col("col1"))
    

    完整代码:

    from pyspark.sql import functions as F
    df = spark.createDataFrame([("ė",), ("2",), ("",), ("@",)], ["col1"])
    
    pats = {
      r"^\d$"          :"digit",
      r"^\p{L}$"       :"letter",
      r"^[\p{P}\p{S}]$":"spec_char",
      r"^$"            :"empty"
    }
    
    whens = F
    for k, v in pats.items():
        whens = whens.when(F.col("col1").rlike(k), v)
    whens = whens.otherwise(F.col("col1"))
    
    df = df.withColumn("col2", whens)
    
    df.show()
    # +----+---------+
    # |col1|     col2|
    # +----+---------+
    # |   ė|   letter|
    # |   2|    digit|
    # |    |    empty|
    # |   @|spec_char|
    # +----+---------+
    
  • 使用

    reduce

    from functools import reduce
    
    whens = reduce(
        lambda acc, p: acc.when(F.col("col1").rlike(p), pats[p]),
        pats.keys(),
        F
    ).otherwise(F.col("col1"))
    

    完整代码:

    from pyspark.sql import functions as F
    from functools import reduce
    df = spark.createDataFrame([("ė",), ("2",), ("",), ("@",)], ["col1"])
    
    pats = {
      r"^\d$"          :"digit",
      r"^\p{L}$"       :"letter",
      r"^[\p{P}\p{S}]$":"spec_char",
      r"^$"            :"empty"
    }
    
    whens = reduce(
        lambda acc, p: acc.when(F.col("col1").rlike(p), pats[p]),
        pats.keys(),
        F
    ).otherwise(F.col("col1"))
    
    df = df.withColumn("col2", whens)
    
    df.show()
    # +----+---------+
    # |col1|     col2|
    # +----+---------+
    # |   ė|   letter|
    # |   2|    digit|
    # |    |    empty|
    # |   @|spec_char|
    # +----+---------+
    

-1
投票

我使用下面的代码片段作为解决方法。

def parse_if_else_statements(sdf:SDF, statements:List[tuple]) -> SDF:
    """
    Chaining if else statements with when & otherwise in pyspark

    Parameters:
    ----------
    sdf: Spark DataFrame
        The dataframe to parse the if else statements.

    statements: List[tuple]
        The if else statements as a list of tuples. The order of tuple elements are
        col_name, if_condition, do, else_do  
        
    """
    
    # TODO account for nested if else statements  
    for statement in statements:
        col_name, if_cond, do, else_do = statement
        sdf = sdf.withColumn(col_name, f.when(if_cond, do).otherwise(else_do))
    return sdf

然后您可以将条件语句传递为

statements = [
# (column_name, if_condition, do, else_do  )
('Col A', col('Col B')=='90', col('col C'), col('Col D')), 
] 
© www.soinside.com 2019 - 2024. All rights reserved.