无法使用任何运算符将具有简单过滤条件的列转换为布尔值

问题描述 投票:0回答:1

我正在尝试从Python中的dict结构动态形成过滤条件,这是一个非常简单的条件,给出以下错误:

Final constructed filter condition: Column<'(CompanyCode IN (1930, 1931))'> Type of final_condition: <class 'pyspark.sql.column.Column'> PySparkValueError: [CANNOT_CONVERT_COLUMN_INTO_BOOL] Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.

我的数据和列如下:

data = [("John", 1930), ("Doe", 1931), ("Jane", 1940)]
columns = ["Name", "Code"]

我正在尝试使用动态代码模仿下面的过滤器:

df = spark.createDataFrame(data, schema=columns)
df.filter(F.col("Code").isin(1930, 1931)).show()

我的代码如下:

    {
        "conditions": [
            {"column": "Code", "operator": "IN", "values": [1930, 1931]}
        ]
    }

下面是我的代码:

def construct_filter_condition(conditions):

    """
    Construct filter condition from a list of conditions.
    :param conditions: List of conditions
    :return: PySpark Column expression
    """
    combined_conditions = []
    for condition in conditions:
        

        column = condition["column"]
        operator = condition["operator"].lower()
        values = condition.get("values")
        value = condition.get("value")

        if operator == "in":
            new_condition = F.col(column).isin(values)
        elif operator == "eq":
            new_condition = F.col(column) == value
        elif operator == "ne":
            new_condition = F.col(column) != value
        elif operator == "lt":
            new_condition = F.col(column) < value
        elif operator == "le":
            new_condition = F.col(column) <= value
        elif operator == "gt":
            new_condition = F.col(column) > value
        elif operator == "ge":
            new_condition = F.col(column) >= value
        elif operator == "not_in":
            new_condition = ~F.col(column).isin(values)
        else:
            continue  # Skip invalid operators
    
        combined_conditions.append(new_condition)
    
    # Combine all conditions using 'AND' logic
        if combined_conditions:
            final_condition = combined_conditions[0]
            for cond in combined_conditions[1:]:
                final_condition = final_condition & cond
            return final_condition
        else:
            return None

示例条件:

    flt_conditions = [
        {
            "conditions": [
                {"column": "CompanyCode", "operator": "IN", "values": [1930, 1931]}
            ]
        }
    ]

构建并应用过滤器

final_condition = construct_filter_condition(flt_conditions[0]["conditions"])
if final_condition:
    print(f"Constructed filter condition: {final_condition}")
    filtered_df = df.filter(final_condition)
    print(f"DataFrame filter syntax: df.filter({final_condition})")
    filtered_df.show()
else:
    print("No valid filter conditions provided.")
python pyspark conditional-statements boolean databricks
1个回答
0
投票

这实际上只是

if final_condition
的问题,因为
final_condition
是一个列过滤器,不能解释为布尔值。相反,我会尝试使用
if final_condition is not None:
,因为您的函数
construct_filter_condition
返回列过滤器或
None

if final_condition is not None:
    print(f"Constructed filter condition: {final_condition}")
    filtered_df = df.filter(final_condition)
    print(f"DataFrame filter syntax: df.filter({final_condition})")
    filtered_df.show()
else:
    print("No valid filter conditions provided.")

输出:

Constructed filter condition: Column<'(Code IN (1930, 1931))'>
DataFrame filter syntax: df.filter(Column<'(Code IN (1930, 1931))'>)
+----+----+
|Name|Code|
+----+----+
|John|1930|
| Doe|1931|
+----+----+
© www.soinside.com 2019 - 2024. All rights reserved.