result = data_frame\
.withColumn(f'{self.constraint_colname}_count', F.count(self.constraint_colname).over(w))\
.withColumn(f'{self.constraint_colname}_occurences', F.lit(self.occurences).cast('long'))\
.filter(F.col(f'{self.constraint_colname}_occurences') == 1)
operator = '=='
result = data_frame\
.withColumn(f'{self.constraint_colname}_count', F.count(self.constraint_colname).over(w))\
.withColumn(f'{self.constraint_colname}_occurences', F.lit(self.occurences).cast('long'))\
.filter(F.col(f'{self.constraint_colname}_occurences') operator 1)
可以吗?我想避免实施 if-elif 语句
from pyspark.sql.column import Column
from pyspark.sql.functions import col
def compare_columns(col1, col2, operator: str):
allowed_operator_list = ["<", ">", ">=", "<=", "!=", "=="]
# Check if col1 and col2 are PySpark columns
assert isinstance(col1, Column), "col1 must be a PySpark Column"
assert isinstance(col2, Column), "col2 must be a PySpark Column"
# Check if operator is in the allowed list
assert operator in allowed_operator_list, "Operator not in allowed list."
col_1_name = col1._jc.toString()
col_2_name = col2._jc.toString()
result_function = eval(f"col('{col_1_name}') {operator} col('{col_2_name}')")
return result_function
# usage:
df.select(compare_columns(col("col1"), col("col2"), "=="))
一般来说,你应该小心使用 eval 函数,因为它可以评估任何代码,所以我不确定我是否会推荐这个。
from pyspark.sql.column import Column
from pyspark.sql.functions import col
def compare_columns(col1, col2, operator: str):
allowed_operator_list = ["<", ">", ">=", "<=", "!=", "=="]
# Check if col1 and col2 are PySpark columns
assert isinstance(col1, Column), "col1 must be a PySpark Column"
assert isinstance(col2, Column), "col2 must be a PySpark Column"
# Check if operator is in the allowed list
assert operator in allowed_operator_list, "Operator not in allowed list."
# Using a dictionary to mimic switch-case behavior
operations = {
"==": col1 == col2,
"<": col1 < col2,
">": col1 > col2,
">=": col1 >= col2,
"<=": col1 <= col2,
"!=": col1 != col2,
return operations[operator]