根据另一列拆分 Datafarme 列 - 列不可迭代

问题描述 投票:0回答:1

我有一个包含 2 列的数据框 - “id”(int)和“values”(结构列表)。我需要在名字上分开。我有一个列名称列表作为分隔符。我需要检查列表中列名的出现情况,如果存在其中一个列名,则拆分数据框。

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType

value_schema = ArrayType(
    StructType([
        StructField("name", StringType(), True),
        StructField("location", StringType(), True)
    ])
)

data = [
    (1, [
        {"name": "col1_US_value_name", "location": "usa"},
        {"name": "col2_name_plex", "location": "usa"},
        {"name": "col4_false_val", "location": "usa"},
        {"name": "col3_name_is_fantasy", "location": "usa"}
    ])
]


df = spark.createDataFrame(data, ["id", "values"])

df = df.withColumn("values", explode(col("values")).alias("values"))
df = df.select(col("id"),col("values.name").alias("name"))
df.display()


col_names = ["col1","col2_name","col3_name_is","col4"]

pattern = "|".join(col_names)
print(pattern)
df = df.withColumn("new_name", regexp_extract("name", pattern, 0))
df.display()

df = df.withColumn(
    "new_value", 
    split(df['name'], concat(df['new_name'],lit('_'))).getItem(1) # also tried split()[1] , getting error 'Column is not iterable'
)
df.display()

预期结果:

id    name                new_name          new_value
1   col1_US_value_name    col1              US_value_name
1   col2_name_plex        col2_name         plex
1   col4_false_val        col4              false_val
1   col3_name_is_fantasy  col3_name_is      fantasy
dataframe apache-spark pyspark split
1个回答
0
投票

尝试下面的代码。

value_schema = ArrayType(
    StructType([
        StructField("name", StringType(), True),
        StructField("location", StringType(), True)
    ])
)

data = [
    (1, [
        {"name": "col1_US_value_name", "location": "usa"},
        {"name": "col2_name_plex", "location": "usa"},
        {"name": "col4_false_val", "location": "usa"},
        {"name": "col3_name_is_fantasy", "location": "usa"}
    ])
]


df = spark.createDataFrame(data, ["id", "values"])

df = df.withColumn("values", explode(col("values")).alias("values"))
df = df.select(col("id"),col("values.name").alias("name"))
df.display(truncate=False)


col_names = ["col1","col2_name","col3_name_is","col4"]
df_cols =spark.createDataFrame([["col1"],["col2_name"],["col3_name_is"],["col4"]],["new_name"])
final_df=df.join(df_cols,how="outer")
final_df = final_df.withColumn("new_values", regexp_replace(col("name"), col("new_name"), ""))
final_df=final_df.filter(col("name")!=col("new_values"))
final_df = final_df.withColumn("new_value1", regexp_replace(col("new_values"), "^_", ""))
final_df.display()
© www.soinside.com 2019 - 2024. All rights reserved.