Pyspark - 从同一数据帧的其他字段中动态指定的字段中检索值

问题描述 投票:0回答:1

我正在使用 PySpark,遇到一个具有挑战性的场景,我需要动态检索同一 DataFrame 的另一个字段中指定的字段的值。然后我需要将这个动态检索的值与固定值进行比较。

上下文如下:

我有一个具有以下架构的 DataFrame df_exploded:

root
|-- \_id: string (nullable = true)
|-- name: string (nullable = true)
|-- precondition: struct (nullable = true)
|    |-- field: string (nullable = true)
|    |-- matchingType: string (nullable = true)
|    |-- matchingValue: string (nullable = true)
|-- businessTransaction: struct (nullable = true)
|    |-- someField1: string (nullable = true)
|    |-- someField2: string (nullable = true)
|    |-- someField3: string (nullable = true)

前置条件字段是一个结构体,包含:

  • field:businessTransaction 中我需要比较其值的字段名称。
  • matchingType:比较类型(例如,等于、大于)。
  • matchingValue:要比较的值。

目标: 对于每一行,我需要从businessTransaction中的precondition.field(该字段可能因行而异)中指定的字段动态检索值,并将其与precondition.matchingValue进行比较。

问题: 如何动态检索和比较 PySpark 中同一 DataFrame 的另一个字段中指定的字段的值?有没有办法使用expr或其他函数来评估dynamic_field_path中存储的列路径以获得实际值?

示例数据:

+----+----+----------------------------+---------------------------+

| id|name|precondition                |businessTransaction        |

| 1  |John|{someField1, equals, 100}   |{someField1 -\> 100, ...}   |

| 2  |Jane|{someField2, equals, 200}   |{someField2 -\> 150, ...}   |

+----+----+----------------------------+---------------------------+

在此示例中,我需要动态比较第一行的businessTransaction.someField1与100,以及第二行的businessTransaction.someField2与200。

任何有关如何实现这一目标的帮助或指导将不胜感激!

这是我的方法的简化版本:

from pyspark.sql.functions import col, concat_ws, expr, when

# Create the dynamic field path
df_exploded = df_exploded.withColumn(
    'dynamic_field_path',
    concat_ws(".", lit("businessTransaction"), col('precondition.field'))
)

# Try to retrieve the value from the dynamically specified field using expr
df_exploded = df_exploded.withColumn(
    'dynamic_field',
    expr("dynamic_field_path")
)

# Check preconditions
df_preconditions_checked = df_exploded.withColumn(
    "is_matching_precondition",
    when(
        col('dynamic_field') == col('precondition.matchingValue'),
        True
    ).otherwise(False)
)

# Filter distinct _id where is_matching_precondition is True
df_matching_preconditions = df_preconditions_checked.filter(col("is_matching_precondition") == True).select(col('_id')).distinct()

问题: 上面的代码没有按预期工作。 Dynamic_field 列最终包含文字字符串路径,而不是动态指定字段的实际值。我收到一条错误,指示列不可迭代。

python dataframe pyspark apache-spark-sql databricks
1个回答
0
投票

您可以创建一个

udf
来处理动态比较部分,如下所示:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType

spark = SparkSession.builder.getOrCreate()

data = [
    ("1", "John", ("someField1", "equals", "100"), {"someField1": "100", "someField2": "120", "someField3": "150"}),
    ("2", "Jane", ("someField2", "greaterThan", "200"), {"someField1": "100", "someField2": "150", "someField3": "180"})
]
schema = StructType([
    StructField("_id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("precondition", StructType([
        StructField("field", StringType(), True),
        StructField("matchingType", StringType(), True),
        StructField("matchingValue", StringType(), True)
    ]), True),
    StructField("businessTransaction", StructType([
        StructField("someField1", StringType(), True),
        StructField("someField2", StringType(), True),
        StructField("someField3", StringType(), True)
    ]), True)
])
df = spark.createDataFrame(data, schema)

df.printSchema()

# root
#  |-- _id: string (nullable = true)
#  |-- name: string (nullable = true)
#  |-- precondition: struct (nullable = true)
#  |    |-- field: string (nullable = true)
#  |    |-- matchingType: string (nullable = true)
#  |    |-- matchingValue: string (nullable = true)
#  |-- businessTransaction: struct (nullable = true)
#  |    |-- someField1: string (nullable = true)
#  |    |-- someField2: string (nullable = true)
#  |    |-- someField3: string (nullable = true)

def evaluate_precondition(row):
    field = row.precondition.field
    matching_type = row.precondition.matchingType
    matching_value = row.precondition.matchingValue
    actual_value = row.businessTransaction[field]

    if matching_type == "equals":
        return int(actual_value) == int(matching_value)
    elif matching_type == "greaterThan":
        return int(actual_value) > int(matching_value)
    elif matching_type == "lessThan":
        return int(actual_value) < int(matching_value)
    else:
        return False

evaluate_precondition_udf = F.udf(evaluate_precondition)

df_preconditions_checked = df.withColumn(
    "is_matching_precondition",
    evaluate_precondition_udf(F.struct("precondition", "businessTransaction"))
)

df_preconditions_checked.show(truncate=False)

# +---+----+------------------------------+-------------------+------------------------+
# |_id|name|precondition                  |businessTransaction|is_matching_precondition|
# +---+----+------------------------------+-------------------+------------------------+
# |1  |John|{someField1, equals, 100}     |{100, 120, 150}    |true                    |
# |2  |Jane|{someField2, greaterThan, 200}|{100, 150, 180}    |false                   |
# +---+----+------------------------------+-------------------+------------------------+

df_matching_preconditions = (
    df_preconditions_checked
    .filter(F.col("is_matching_precondition") == True)
    .select(F.col("_id")).distinct()
)

df_matching_preconditions.show()

# +---+
# |_id|
# +---+
# |  1|
# +---+

为了完成这项工作,我们将一个新的

struct
列传递给 UDF,然后在 UDF 内“动态”从中获取值。

此外,您还可以根据您在 UDF 中的选择来扩展比较运算符。

© www.soinside.com 2019 - 2024. All rights reserved.