假设我在 Databricks 中有以下 pyspark 数据框:
一些其他_列 | 价格_历史记录 |
---|---|
测试1 | [{“日期”:“2021-03-21T01:20:33Z”,“price_tag”:“N”,“价格”:“9.23”,“price_promotion”:“1.34”,“AKT”:假,” my_column":null,"供应商":"some_supplier"}] |
测试2 | [{“日期”:“2021-03-23T01:20:33Z”,“price_tag”:空,“价格”:“10.40”,“price_promotion”:空,“AKT”:true,“my_column”:“某事”,“供应商”:null}] |
可以直接看到“price_history”列中有一些嵌套的列结构。但是我如何获得构建这个表的代码呢?有没有办法将此表转换为实际代码,然后可用于再次创建表?
我知道获取有关表的更多信息的唯一方法是使用
df.schema
但生成的模式并没有告诉我如何手动创建具有复杂数据结构(如结构、映射类型、数组类型等)的数据帧。如果有一种方法可以更轻松地编写代码测试,因为可以轻松创建示例数据。
所以我想要得到的是以下代码:
from datetime import datetime
from decimal import Decimal
from pyspark.sql.types import (StructType, StructField, StringType,
ArrayType, TimestampType, BooleanType,
DecimalType)
schema = StructType([
StructField("some_other_column", StringType()),
StructField('price_history', ArrayType(StructType([
StructField('date', TimestampType()),
StructField('price_tag', StringType()),
StructField('price', DecimalType(12, 2)),
StructField('price_promotion', DecimalType(12, 2)),
StructField('AKT', BooleanType()),
StructField("my_column", StringType()),
StructField("supplier", StringType()),
]))),
])
data = [
('test1', [(datetime(2021, 3, 21, 1, 20, 33), 'N',
Decimal('9.23'), Decimal('1.34'), False, None, 'some_supplier')]),
('test2', [(datetime(2021, 3, 23, 1, 20, 33), None,
Decimal('10.40'), None, True, 'something', None)]),
]
df = spark.createDataFrame(schema=schema, data=data)
我不知道有什么解决方案可以生成构建表格的代码。也许 ChatGPT 可以帮助解决这个问题,但这是一个从平面列创建结构数组的示例:
import pyspark.sql.functions as f
from pyspark.sql import SparkSession
import datetime
spark = SparkSession.builder.getOrCreate()
initial_df = spark.createDataFrame([
('test1', datetime.datetime(2024, 1, 8, 12), 'N'),
('test1', datetime.datetime(2024, 1, 8, 13, 30), 'Y'),
('test2', datetime.datetime(2024, 1, 8, 14), 'N')
], ['some_other_column', 'date', 'price_tag'])
refined_df = (
initial_df
.withColumn('price_detail', f.struct(*[f.col(element) for element in ['date', 'price_tag']]))
.groupBy('some_other_column')
.agg(f.collect_list(f.col('price_detail')).alias('price_history'))
)
refined_df.show(truncate = False)
refined_df.printSchema()
输出:
+-----------------+----------------------------------------------------+
|some_other_column|price_history |
+-----------------+----------------------------------------------------+
|test1 |[{2024-01-08 12:00:00, N}, {2024-01-08 13:30:00, Y}]|
|test2 |[{2024-01-08 14:00:00, N}] |
+-----------------+----------------------------------------------------+
root
|-- some_other_column: string (nullable = true)
|-- price_history: array (nullable = false)
| |-- element: struct (containsNull = false)
| | |-- date: timestamp (nullable = true)
| | |-- price_tag: string (nullable = true)