在 PySpark 数据帧的 where/filter 子句中动态设置运算符

问题描述 投票:0回答:1

我有这个数据框:

result = data_frame\
    .withColumn(f'{self.constraint_colname}_count', F.count(self.constraint_colname).over(w))\
    .withColumn(f'{self.constraint_colname}_occurences', F.lit(self.occurences).cast('long'))\
    .filter(F.col(f'{self.constraint_colname}_occurences') == 1)

在过滤器子句上,我想从列表中动态设置

==
运算符
<,>,=>,=<,!=

我尝试过:

operator = '=='
result = data_frame\
    .withColumn(f'{self.constraint_colname}_count', F.count(self.constraint_colname).over(w))\
    .withColumn(f'{self.constraint_colname}_occurences', F.lit(self.occurences).cast('long'))\
    .filter(F.col(f'{self.constraint_colname}_occurences') operator 1)

可以吗?我想避免实施 if-elif 语句

pyspark apache-spark-sql
1个回答
0
投票

实现此目的的一种方法是编写一个执行此操作的函数。

from pyspark.sql.column import Column
from pyspark.sql.functions import col

def compare_columns(col1, col2, operator: str):
    allowed_operator_list = ["<", ">", ">=", "<=", "!=", "=="]
    
    # Check if col1 and col2 are PySpark columns
    assert isinstance(col1, Column), "col1 must be a PySpark Column"
    assert isinstance(col2, Column), "col2 must be a PySpark Column"

    # Check if operator is in the allowed list
    assert operator in allowed_operator_list, "Operator not in allowed list."

    col_1_name = col1._jc.toString()
    col_2_name = col2._jc.toString()

    result_function = eval(f"col('{col_1_name}') {operator} col('{col_2_name}')")
    return result_function

# usage:
df.select(compare_columns(col("col1"), col("col2"), "=="))

一般来说,你应该小心使用 eval 函数,因为它可以评估任何代码,所以我不确定我是否会推荐这个。

或者,可以说更好的是执行以下操作。但你必须执行你想要的每一个选择。而且看来你想避免这样的事情。

from pyspark.sql.column import Column
from pyspark.sql.functions import col

def compare_columns(col1, col2, operator: str):
    allowed_operator_list = ["<", ">", ">=", "<=", "!=", "=="]
    
    # Check if col1 and col2 are PySpark columns
    assert isinstance(col1, Column), "col1 must be a PySpark Column"
    assert isinstance(col2, Column), "col2 must be a PySpark Column"

    # Check if operator is in the allowed list
    assert operator in allowed_operator_list, "Operator not in allowed list."

    # Using a dictionary to mimic switch-case behavior
    operations = {
        "==": col1 == col2,
        "<": col1 < col2,
        ">": col1 > col2,
        ">=": col1 >= col2,
        "<=": col1 <= col2,
        "!=": col1 != col2,
    }

    return operations[operator]

© www.soinside.com 2019 - 2024. All rights reserved.