我正在使用 PySpark,遇到一个具有挑战性的场景,我需要动态检索同一 DataFrame 的另一个字段中指定的字段的值。然后我需要将这个动态检索的值与固定值进行比较。
上下文如下:
我有一个具有以下架构的 DataFrame df_exploded:
root
|-- \_id: string (nullable = true)
|-- name: string (nullable = true)
|-- precondition: struct (nullable = true)
| |-- field: string (nullable = true)
| |-- matchingType: string (nullable = true)
| |-- matchingValue: string (nullable = true)
|-- businessTransaction: struct (nullable = true)
| |-- someField1: string (nullable = true)
| |-- someField2: string (nullable = true)
| |-- someField3: string (nullable = true)
前置条件字段是一个结构体,包含:
目标: 对于每一行,我需要从businessTransaction中的precondition.field(该字段可能因行而异)中指定的字段动态检索值,并将其与precondition.matchingValue进行比较。
问题: 如何动态检索和比较 PySpark 中同一 DataFrame 的另一个字段中指定的字段的值?有没有办法使用expr或其他函数来评估dynamic_field_path中存储的列路径以获得实际值?
示例数据:
+----+----+----------------------------+---------------------------+
| id|name|precondition |businessTransaction |
| 1 |John|{someField1, equals, 100} |{someField1 -\> 100, ...} |
| 2 |Jane|{someField2, equals, 200} |{someField2 -\> 150, ...} |
+----+----+----------------------------+---------------------------+
在此示例中,我需要动态比较第一行的businessTransaction.someField1与100,以及第二行的businessTransaction.someField2与200。
任何有关如何实现这一目标的帮助或指导将不胜感激!
这是我的方法的简化版本:
from pyspark.sql.functions import col, concat_ws, expr, when
# Create the dynamic field path
df_exploded = df_exploded.withColumn(
'dynamic_field_path',
concat_ws(".", lit("businessTransaction"), col('precondition.field'))
)
# Try to retrieve the value from the dynamically specified field using expr
df_exploded = df_exploded.withColumn(
'dynamic_field',
expr("dynamic_field_path")
)
# Check preconditions
df_preconditions_checked = df_exploded.withColumn(
"is_matching_precondition",
when(
col('dynamic_field') == col('precondition.matchingValue'),
True
).otherwise(False)
)
# Filter distinct _id where is_matching_precondition is True
df_matching_preconditions = df_preconditions_checked.filter(col("is_matching_precondition") == True).select(col('_id')).distinct()
问题: 上面的代码没有按预期工作。 Dynamic_field 列最终包含文字字符串路径,而不是动态指定字段的实际值。我收到一条错误,指示列不可迭代。
udf
来处理动态比较部分,如下所示:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType
spark = SparkSession.builder.getOrCreate()
data = [
("1", "John", ("someField1", "equals", "100"), {"someField1": "100", "someField2": "120", "someField3": "150"}),
("2", "Jane", ("someField2", "greaterThan", "200"), {"someField1": "100", "someField2": "150", "someField3": "180"})
]
schema = StructType([
StructField("_id", StringType(), True),
StructField("name", StringType(), True),
StructField("precondition", StructType([
StructField("field", StringType(), True),
StructField("matchingType", StringType(), True),
StructField("matchingValue", StringType(), True)
]), True),
StructField("businessTransaction", StructType([
StructField("someField1", StringType(), True),
StructField("someField2", StringType(), True),
StructField("someField3", StringType(), True)
]), True)
])
df = spark.createDataFrame(data, schema)
df.printSchema()
# root
# |-- _id: string (nullable = true)
# |-- name: string (nullable = true)
# |-- precondition: struct (nullable = true)
# | |-- field: string (nullable = true)
# | |-- matchingType: string (nullable = true)
# | |-- matchingValue: string (nullable = true)
# |-- businessTransaction: struct (nullable = true)
# | |-- someField1: string (nullable = true)
# | |-- someField2: string (nullable = true)
# | |-- someField3: string (nullable = true)
def evaluate_precondition(row):
field = row.precondition.field
matching_type = row.precondition.matchingType
matching_value = row.precondition.matchingValue
actual_value = row.businessTransaction[field]
if matching_type == "equals":
return int(actual_value) == int(matching_value)
elif matching_type == "greaterThan":
return int(actual_value) > int(matching_value)
elif matching_type == "lessThan":
return int(actual_value) < int(matching_value)
else:
return False
evaluate_precondition_udf = F.udf(evaluate_precondition)
df_preconditions_checked = df.withColumn(
"is_matching_precondition",
evaluate_precondition_udf(F.struct("precondition", "businessTransaction"))
)
df_preconditions_checked.show(truncate=False)
# +---+----+------------------------------+-------------------+------------------------+
# |_id|name|precondition |businessTransaction|is_matching_precondition|
# +---+----+------------------------------+-------------------+------------------------+
# |1 |John|{someField1, equals, 100} |{100, 120, 150} |true |
# |2 |Jane|{someField2, greaterThan, 200}|{100, 150, 180} |false |
# +---+----+------------------------------+-------------------+------------------------+
df_matching_preconditions = (
df_preconditions_checked
.filter(F.col("is_matching_precondition") == True)
.select(F.col("_id")).distinct()
)
df_matching_preconditions.show()
# +---+
# |_id|
# +---+
# | 1|
# +---+
struct
列传递给 UDF,然后在 UDF 内“动态”从中获取值。
此外,您还可以根据您在 UDF 中的选择来扩展比较运算符。