我有以下架构的数据。我希望所有列都应按字母顺序排序。我想要它在 pyspark 数据框中。
root
|-- _id: string (nullable = true)
|-- first_name: string (nullable = true)
|-- last_name: string (nullable = true)
|-- address: struct (nullable = true)
| |-- pin: integer (nullable = true)
| |-- city: string (nullable = true)
| |-- street: string (nullable = true)
下面的代码仅对外部列进行排序,而不对嵌套列进行排序。
>>> cols = df.columns
>>> df2=df[sorted(cols)]
>>> df2.printSchema()
此代码后的架构如下所示
root
|-- _id: string (nullable = true)
|-- address: struct (nullable = true)
| |-- pin: integer (nullable = true)
| |-- city: string (nullable = true)
| |-- street: string (nullable = true)
|-- first_name: string (nullable = true)
|-- last_name: string (nullable = true)
(因为id处有下划线,所以先出现)
我想要的架构如下。 (连地址里面的列也要排序)
root
|-- _id: string (nullable = true)
|-- address: struct (nullable = true)
| |-- city: string (nullable = true)
| |-- pin: integer (nullable = true)
| |-- street: string (nullable = true)
|-- first_name: string (nullable = true)
|-- last_name: string (nullable = true)
提前致谢。
这是一个适用于任意深度嵌套的解决方案,它不依赖于硬编码任何列名称。
为了演示,我创建了以下稍微复杂的架构,其中
StructType
列中有第二层嵌套。假设您的 DataFrame
address
如下:schema
注意
df.printSchema()
#root
# |-- _id: string (nullable = true)
# |-- first_name: string (nullable = true)
# |-- last_name: string (nullable = true)
# |-- address: struct (nullable = true)
# | |-- pin: integer (nullable = true)
# | |-- city: string (nullable = true)
# | |-- zip: struct (nullable = true)
# | | |-- last4: integer (nullable = true)
# | | |-- first5: integer (nullable = true)
# | |-- street: string (nullable = true)
字段,其中包含 2 个无序子字段。
您可以定义一个函数,该函数将递归地单步执行您的
address.zip
并对字段进行排序以构建 Spark-SQL 选择表达式:
schema
在此 DataFrame 的架构上运行此命令会产生结果(为了便于阅读,我已将长“地址”字符串分成两行):
from pyspark.sql.types import StructType, StructField
def schemaToSelectExpr(schema, baseField=""):
select_cols = []
for structField in sorted(schema, key=lambda x: x.name):
if structField.dataType.typeName() == 'struct':
subFields = []
for fld in sorted(structField.jsonValue()['type']['fields'],
key=lambda x: x['name']):
newStruct = StructType([StructField.fromJson(fld)])
newBaseField = structField.name
if baseField:
newBaseField = baseField + "." + newBaseField
subFields.extend(schemaToSelectExpr(newStruct, baseField=newBaseField))
select_cols.append(
"struct(" + ",".join(subFields) + ") AS {}".format(structField.name)
)
else:
if baseField:
select_cols.append(baseField + "." + structField.name)
else:
select_cols.append(structField.name)
return select_cols
现在使用
print(schemaToSelectExpr(df.schema))
#['_id',
#'struct(address.city,address.pin,address.street,
# struct(address.zip.first5,address.zip.last4) AS zip) AS address',
# 'first_name',
# 'last_name']
对列进行排序:
selectExpr
df = df.selectExpr(schemaToSelectExpr(df.schema))
df.printSchema()
#root
# |-- _id: string (nullable = true)
# |-- address: struct (nullable = false)
# | |-- city: string (nullable = true)
# | |-- pin: integer (nullable = true)
# | |-- street: string (nullable = true)
# | |-- zip: struct (nullable = false)
# | | |-- first5: integer (nullable = true)
# | | |-- last4: integer (nullable = true)
# |-- first_name: string (nullable = true)
# |-- last_name: string (nullable = true)
合成器来展平您的 DF。对于嵌套展平,您可以使用以下内容:
How to flatten a struct in a Spark dataframe?。 在展平的数据框中,您可以创建一个新的 StructCol,其中输入列已排序:
colname.*