我有一本字典(变量
pats
),其中有许多when
参数:条件和值。
from pyspark.sql import functions as F
df = spark.createDataFrame([("ė",), ("2",), ("",), ("@",)], ["col1"])
pats = {
r"^\d$" :"digit",
r"^\p{L}$" :"letter",
r"^[\p{P}\p{S}]$":"spec_char",
r"^$" :"empty"
}
whens = (
F.when(F.col("col1").rlike(list(pats)[0]), pats[list(pats)[0]])
.when(F.col("col1").rlike(list(pats)[1]), pats[list(pats)[1]])
.when(F.col("col1").rlike(list(pats)[2]), pats[list(pats)[2]])
.when(F.col("col1").rlike(list(pats)[3]), pats[list(pats)[3]])
.otherwise(F.col("col1"))
)
df = df.withColumn("col2", whens)
df.show()
# +----+---------+
# |col1| col2|
# +----+---------+
# | ė| letter|
# | 2| digit|
# | | empty|
# | @|spec_char|
# +----+---------+
我正在寻找一种可扩展的方法来链所有
when
条件,所以我不需要为每个键写一行。
没有
reduce
whens = F
for k, v in pats.items():
whens = whens.when(F.col("col1").rlike(k), v)
whens = whens.otherwise(F.col("col1"))
完整代码:
from pyspark.sql import functions as F
df = spark.createDataFrame([("ė",), ("2",), ("",), ("@",)], ["col1"])
pats = {
r"^\d$" :"digit",
r"^\p{L}$" :"letter",
r"^[\p{P}\p{S}]$":"spec_char",
r"^$" :"empty"
}
whens = F
for k, v in pats.items():
whens = whens.when(F.col("col1").rlike(k), v)
whens = whens.otherwise(F.col("col1"))
df = df.withColumn("col2", whens)
df.show()
# +----+---------+
# |col1| col2|
# +----+---------+
# | ė| letter|
# | 2| digit|
# | | empty|
# | @|spec_char|
# +----+---------+
使用
reduce
from functools import reduce
whens = reduce(
lambda acc, p: acc.when(F.col("col1").rlike(p), pats[p]),
pats.keys(),
F
).otherwise(F.col("col1"))
完整代码:
from pyspark.sql import functions as F
from functools import reduce
df = spark.createDataFrame([("ė",), ("2",), ("",), ("@",)], ["col1"])
pats = {
r"^\d$" :"digit",
r"^\p{L}$" :"letter",
r"^[\p{P}\p{S}]$":"spec_char",
r"^$" :"empty"
}
whens = reduce(
lambda acc, p: acc.when(F.col("col1").rlike(p), pats[p]),
pats.keys(),
F
).otherwise(F.col("col1"))
df = df.withColumn("col2", whens)
df.show()
# +----+---------+
# |col1| col2|
# +----+---------+
# | ė| letter|
# | 2| digit|
# | | empty|
# | @|spec_char|
# +----+---------+
我使用下面的代码片段作为解决方法。
def parse_if_else_statements(sdf:SDF, statements:List[tuple]) -> SDF:
"""
Chaining if else statements with when & otherwise in pyspark
Parameters:
----------
sdf: Spark DataFrame
The dataframe to parse the if else statements.
statements: List[tuple]
The if else statements as a list of tuples. The order of tuple elements are
col_name, if_condition, do, else_do
"""
# TODO account for nested if else statements
for statement in statements:
col_name, if_cond, do, else_do = statement
sdf = sdf.withColumn(col_name, f.when(if_cond, do).otherwise(else_do))
return sdf
然后您可以将条件语句传递为
statements = [
# (column_name, if_condition, do, else_do )
('Col A', col('Col B')=='90', col('col C'), col('Col D')),
]