如何在 pyspark 中按字母顺序对嵌套结构的列进行排序?

问题描述 投票:0回答:3

我有以下架构的数据。我希望所有列都应按字母顺序排序。我想要它在 pyspark 数据框中。

root
 |-- _id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- pin: integer (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- street: string (nullable = true)

下面的代码仅对外部列进行排序,而不对嵌套列进行排序。

>>> cols = df.columns
>>> df2=df[sorted(cols)]
>>> df2.printSchema()

此代码后的架构如下所示

root
 |-- _id: string (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- pin: integer (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- street: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)

(因为id处有下划线,所以先出现)

我想要的架构如下。 (连地址里面的列也要排序)

root
 |-- _id: string (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- pin: integer (nullable = true)
 |    |-- street: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)

提前致谢。

python apache-spark struct pyspark
3个回答
3
投票

这是一个适用于任意深度嵌套的解决方案,它不依赖于硬编码任何列名称。


为了演示,我创建了以下稍微复杂的架构,其中

StructType

列中有第二层嵌套。假设您的 DataFrame

address
如下:

schema

注意 
df.printSchema() #root # |-- _id: string (nullable = true) # |-- first_name: string (nullable = true) # |-- last_name: string (nullable = true) # |-- address: struct (nullable = true) # | |-- pin: integer (nullable = true) # | |-- city: string (nullable = true) # | |-- zip: struct (nullable = true) # | | |-- last4: integer (nullable = true) # | | |-- first5: integer (nullable = true) # | |-- street: string (nullable = true)

字段,其中包含 2 个无序子字段。


您可以定义一个函数,该函数将递归地单步执行您的

address.zip

并对字段进行排序以构建 Spark-SQL 选择表达式:


schema

在此 DataFrame 的架构上运行此命令会产生结果(为了便于阅读,我已将长“地址”字符串分成两行):

from pyspark.sql.types import StructType, StructField def schemaToSelectExpr(schema, baseField=""): select_cols = [] for structField in sorted(schema, key=lambda x: x.name): if structField.dataType.typeName() == 'struct': subFields = [] for fld in sorted(structField.jsonValue()['type']['fields'], key=lambda x: x['name']): newStruct = StructType([StructField.fromJson(fld)]) newBaseField = structField.name if baseField: newBaseField = baseField + "." + newBaseField subFields.extend(schemaToSelectExpr(newStruct, baseField=newBaseField)) select_cols.append( "struct(" + ",".join(subFields) + ") AS {}".format(structField.name) ) else: if baseField: select_cols.append(baseField + "." + structField.name) else: select_cols.append(structField.name) return select_cols

现在使用 
print(schemaToSelectExpr(df.schema)) #['_id', #'struct(address.city,address.pin,address.street, # struct(address.zip.first5,address.zip.last4) AS zip) AS address', # 'first_name', # 'last_name']

对列进行排序:


selectExpr



0
投票
quinn

,它实现了 sort_column 函数,支持对嵌套 Struct 和 Array(Struct) 字段进行排序。该实现最初受到@pault 答案的启发。 对于 Scala Spark,

spark-daria

sortColumnsBy 也支持排序嵌套的 Struct 和 Array(Struct)。 scala API 更加灵活,因为它使您能够编写自定义排序


-1
投票
df = df.selectExpr(schemaToSelectExpr(df.schema)) df.printSchema() #root # |-- _id: string (nullable = true) # |-- address: struct (nullable = false) # | |-- city: string (nullable = true) # | |-- pin: integer (nullable = true) # | |-- street: string (nullable = true) # | |-- zip: struct (nullable = false) # | | |-- first5: integer (nullable = true) # | | |-- last4: integer (nullable = true) # |-- first_name: string (nullable = true) # |-- last_name: string (nullable = true)

合成器来展平您的 DF。对于嵌套展平,您可以使用以下内容:

How to flatten a struct in a Spark dataframe?
在展平的数据框中,您可以创建一个新的 StructCol,其中输入列已排序:

colname.*

	
© www.soinside.com 2019 - 2024. All rights reserved.