我正在尝试从Python中的dict结构动态形成过滤条件,这是一个非常简单的条件,给出以下错误:
Final constructed filter condition: Column<'(CompanyCode IN (1930, 1931))'> Type of final_condition: <class 'pyspark.sql.column.Column'> PySparkValueError: [CANNOT_CONVERT_COLUMN_INTO_BOOL] Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.
我的数据和列如下:
data = [("John", 1930), ("Doe", 1931), ("Jane", 1940)]
columns = ["Name", "Code"]
我正在尝试使用动态代码模仿下面的过滤器:
df = spark.createDataFrame(data, schema=columns)
df.filter(F.col("Code").isin(1930, 1931)).show()
我的代码如下:
{
"conditions": [
{"column": "Code", "operator": "IN", "values": [1930, 1931]}
]
}
下面是我的代码:
def construct_filter_condition(conditions):
"""
Construct filter condition from a list of conditions.
:param conditions: List of conditions
:return: PySpark Column expression
"""
combined_conditions = []
for condition in conditions:
column = condition["column"]
operator = condition["operator"].lower()
values = condition.get("values")
value = condition.get("value")
if operator == "in":
new_condition = F.col(column).isin(values)
elif operator == "eq":
new_condition = F.col(column) == value
elif operator == "ne":
new_condition = F.col(column) != value
elif operator == "lt":
new_condition = F.col(column) < value
elif operator == "le":
new_condition = F.col(column) <= value
elif operator == "gt":
new_condition = F.col(column) > value
elif operator == "ge":
new_condition = F.col(column) >= value
elif operator == "not_in":
new_condition = ~F.col(column).isin(values)
else:
continue # Skip invalid operators
combined_conditions.append(new_condition)
# Combine all conditions using 'AND' logic
if combined_conditions:
final_condition = combined_conditions[0]
for cond in combined_conditions[1:]:
final_condition = final_condition & cond
return final_condition
else:
return None
示例条件:
flt_conditions = [
{
"conditions": [
{"column": "CompanyCode", "operator": "IN", "values": [1930, 1931]}
]
}
]
构建并应用过滤器
final_condition = construct_filter_condition(flt_conditions[0]["conditions"])
if final_condition:
print(f"Constructed filter condition: {final_condition}")
filtered_df = df.filter(final_condition)
print(f"DataFrame filter syntax: df.filter({final_condition})")
filtered_df.show()
else:
print("No valid filter conditions provided.")
这实际上只是
if final_condition
的问题,因为 final_condition
是一个列过滤器,不能解释为布尔值。相反,我会尝试使用 if final_condition is not None:
,因为您的函数 construct_filter_condition
返回列过滤器或 None
。
if final_condition is not None:
print(f"Constructed filter condition: {final_condition}")
filtered_df = df.filter(final_condition)
print(f"DataFrame filter syntax: df.filter({final_condition})")
filtered_df.show()
else:
print("No valid filter conditions provided.")
输出:
Constructed filter condition: Column<'(Code IN (1930, 1931))'>
DataFrame filter syntax: df.filter(Column<'(Code IN (1930, 1931))'>)
+----+----+
|Name|Code|
+----+----+
|John|1930|
| Doe|1931|
+----+----+