如何使用 Pandera 验证嵌套的 Spark DataFrame?

问题描述 投票:0回答:1

是否有可能使用

pandera.pyspark
验证嵌套的 Spark DataFrame?这是 StructType 的示例,但 ArrayType 也可以类似。

from pandera.pyspark import DataFrameModel, Field
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, MapType

data8 = [
    (("Anna", "Rose", ""), "NY"),
    (("James", None, "Smith"), "OH"),
    (("Julia", "", "Williams"), "OH"),
    (("Maria", "Anne", "Jones"), "NY"),
    (("Jen", "Mary", "Brown"), "NY"),
    (("Mike", "Mary", "Williams"), "OH"),
    (("Carl", "Jon", "White"), "OH"),
]

schema8 = """
name struct<firstname string,
    middlename string,
    lastname string>,
state string
"""
df8 = spark.createDataFrame(data=data8, schema=schema8)
df8.printSchema()
df8.show(truncate=False)

返回简单示例 DF:

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- state: string (nullable = true)

+----------------------+-----+
|name                  |state|
+----------------------+-----+
|{Anna, Rose, }        |NY   |
|{James, null, Smith}  |OH   |
|{Julia, , Williams}   |OH   |
|{Maria, Anne, Jones}  |NY   |
|{Jen, Mary, Brown}    |NY   |
|{Mike, Mary, Williams}|OH   |
|{Carl, Jon, White}    |OH   |
+----------------------+-----+

接下来我想用我的 Pandera 模型验证这个 DF,但我不知道如何在 Pandera 中描述嵌套模式。

他们不是工作:

# idea 1
class Person(DataFrameModel):
    name: StructType([StructField('firstname', StringType(), True), StructField('middlename', StringType(), True), StructField('lastname', StringType(), True)]) = Field()
    state: StringType() = Field()
# TypeError: Data type 'StructType([StructField('firstname', StringType(), True), StructField('middlename', StringType(), True), StructField('lastname', StringType(), True)])' not understood by Engine.

# idea 2
class PersonName(DataFrameModel):
    firstname: StringType() = Field()
    middlename: StringType() = Field()
    lastname: StringType() = Field()


class Person(DataFrameModel):
    name: PersonName
    state: StringType() = Field()
# TypeError: Data type 'PersonName' not understood by Engine.

# idea 3
schema = pa.DataFrameSchema(
    {
        "name": Column(
            StructType([
                StructField("firstname", StringType(), True),
                StructField("middlename", StringType(), True),
                StructField("lastname", StringType(), True)
            ])
        ),
        "state": Column(StringType(), nullable=False)
    }
)
# TypeError: Data type 'StructType([StructField('firstname', StringType(), True), StructField('middlename', StringType(), True), StructField('lastname', StringType(), True)])' not understood by Engine.

验证期间

Person.validate(df8)

dataframe pyspark apache-spark-sql nested pandera
1个回答
0
投票

您找到解决方案了吗?

© www.soinside.com 2019 - 2024. All rights reserved.