我有一个包含 2 列的数据框 - “id”(int)和“values”(结构列表)。我需要在名字上分开。我有一个列名称列表作为分隔符。我需要检查列表中列名的出现情况,如果存在其中一个列名,则拆分数据框。
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType
value_schema = ArrayType(
StructType([
StructField("name", StringType(), True),
StructField("location", StringType(), True)
])
)
data = [
(1, [
{"name": "col1_US_value_name", "location": "usa"},
{"name": "col2_name_plex", "location": "usa"},
{"name": "col4_false_val", "location": "usa"},
{"name": "col3_name_is_fantasy", "location": "usa"}
])
]
df = spark.createDataFrame(data, ["id", "values"])
df = df.withColumn("values", explode(col("values")).alias("values"))
df = df.select(col("id"),col("values.name").alias("name"))
df.display()
col_names = ["col1","col2_name","col3_name_is","col4"]
pattern = "|".join(col_names)
print(pattern)
df = df.withColumn("new_name", regexp_extract("name", pattern, 0))
df.display()
df = df.withColumn(
"new_value",
split(df['name'], concat(df['new_name'],lit('_'))).getItem(1) # also tried split()[1] , getting error 'Column is not iterable'
)
df.display()
预期结果:
id name new_name new_value
1 col1_US_value_name col1 US_value_name
1 col2_name_plex col2_name plex
1 col4_false_val col4 false_val
1 col3_name_is_fantasy col3_name_is fantasy
尝试下面的代码。
value_schema = ArrayType(
StructType([
StructField("name", StringType(), True),
StructField("location", StringType(), True)
])
)
data = [
(1, [
{"name": "col1_US_value_name", "location": "usa"},
{"name": "col2_name_plex", "location": "usa"},
{"name": "col4_false_val", "location": "usa"},
{"name": "col3_name_is_fantasy", "location": "usa"}
])
]
df = spark.createDataFrame(data, ["id", "values"])
df = df.withColumn("values", explode(col("values")).alias("values"))
df = df.select(col("id"),col("values.name").alias("name"))
df.display(truncate=False)
col_names = ["col1","col2_name","col3_name_is","col4"]
df_cols =spark.createDataFrame([["col1"],["col2_name"],["col3_name_is"],["col4"]],["new_name"])
final_df=df.join(df_cols,how="outer")
final_df = final_df.withColumn("new_values", regexp_replace(col("name"), col("new_name"), ""))
final_df=final_df.filter(col("name")!=col("new_values"))
final_df = final_df.withColumn("new_value1", regexp_replace(col("new_values"), "^_", ""))
final_df.display()