我有一个 json 数据,其中位置是使用以下值导出的数组列
[“美国”、“中国”、“印度”、“英国”]
[“尼泊尔”、“中国”、“印度”、“英国”、“日本”]
我需要一个简单的 SQL 查询来分解数组列,然后根据数组中值的数量转换为动态数量的列。像这样的ivot(explode(from_json(jsondata:export.location, 'array'))) as loc_
选择 from_json(jsondata:export.location, 'array') AS
Location
枢轴(爆炸(from_json(jsondata:export.location,'数组')))作为loc_,
来自我的表
输入 |地点 | | -------- | | [“中国”、“印度”、“英国”] | | [“中国”、“印度”、“英国”、“日本”] |
输出
地点 | loc_1 | loc_2 | loc_3 | loc_4 |
---|---|---|---|---|
[“中国”、“印度”、“英国”] | “中国” | “印度” | “英国” | |
[“中国”、“印度”、“英国”、“日本”] | “中国” | “印度” | “英国” | “日本” |
您可以尝试应用
posexplode
和窗口来创建唯一标识符,该标识符可以将每个数组行划分为唯一元素。
# Define the data
data = [
(["China", "India", "UK"],),
(["China", "India", "UK", "Japan"],)
]
# Define the schema
schema = StructType([
StructField("Location", ArrayType(StringType()))
])
# Create a DataFrame
df = spark.createDataFrame(data, schema=schema)
df.show(df.count(), False)
# Explode the 'Location' column
print("Explode the 'Location' column")
df = df.select(F.posexplode(F.col('Location')).alias('loc_', 'Location'))
df.show(df.count(), False)
# Create a column that identifies each list of locations using a pseudo-unique identifier
df = df.withColumn('id', F.when(F.col('loc_') == 0, F.monotonically_increasing_id()).otherwise(None))
# Forward fill the null values in the 'id' column using the -sys.maxsize window
window = Window.rowsBetween(-sys.maxsize, 0)
df = df.withColumn('id', F.last('id', ignorenulls=True).over(window))
# Recreate the 'Location' column
df = df.groupBy('id').agg(F.collect_list('Location').alias('Location'), F.max('loc_').alias('max_loc_'))
# Explode the 'Location' column and filter the DataFrame to give the values that have the location index
# less than or equal to the maximum location index
df = df.select('id', 'Location', F.posexplode('Location').alias('loc_', 'exploded_Location'))
df = df.filter(F.col('loc_') <= F.col('max_loc_'))
# Pivot the DataFrame
df_pivot = df.groupBy('id', 'Location').pivot('loc_').agg(F.first('exploded_Location'))
# Show the pivoted DataFrame
df_pivot.show(df_pivot.count(), False)
这给出了以下结果:
+---+-------------------------+-----+-----+---+-----+
|id |Location |0 |1 |2 |3 |
+---+-------------------------+-----+-----+---+-----+
|0 |[China, India, UK] |China|India|UK |null |
|1 |[China, India, UK, Japan]|China|India|UK |Japan|
+---+-------------------------+-----+-----+---+-----+
您可以执行类似的操作,将数组列拆分为单独的列:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructField, StringType, ArrayType, StructType
# Create a SparkSession
spark = SparkSession.builder \
.appName("Split Array Column Example") \
.master('local') \
.getOrCreate()
# Define the data
data = [
(["China", "India", "UK"],),
(["China", "India", "UK", "Japan"],)
]
# Define the schema
schema = StructType([
StructField("Location", ArrayType(StringType()))
])
# Create a DataFrame
df = spark.createDataFrame(data, schema=schema)
# Determine the maximum array length
max_length = df.selectExpr("max(size(Location))").collect()[0][0]
# Split the array column into individual columns
df_split = df.select('Location', *[
col("Location")[i].alias(f"Location_{i+1}")
for i in range(max_length)
])
# Show the DataFrame with split columns
df_split.show()