有什么方法可以将新列附加到现有的镶木地板文件中吗?
我目前正在参加 Kaggle 比赛,我已将所有数据转换为 Parquet 文件。
在这种情况下,我将 parquet 文件读入 pyspark DataFrame,进行了一些特征提取,并使用
将新列附加到 DataFramepysaprk.DataFrame.withColumn()。
之后,我想将新列保存在源镶木地板文件中。
我知道 Spark SQL 带有 Parquet 模式演化,但该示例仅显示了带有键值的情况。
镶木地板“追加”模式也不起作用。它仅将新行追加到镶木地板文件中。 是否可以将新列附加到现有镶木地板文件而不是再次生成整个表? 或者我必须生成一个单独的新镶木地板文件并在运行时加入它们。
虽然这个问题已经发布2年了,但仍然没有答案,让我自己回答我自己的问题。
我还在使用Spark的时候,Spark的版本是1.4。我不喜欢新版本,但对于该版本,向镶木地板文件添加新列是不可能的。
在 parquet 中,您不修改文件,而是读取它们、修改它们并将它们写回,您不能只更改需要读取和写入完整文件的列。
是的,Databricks Delta 以及parquet 桌子都可以。下面给出了一个例子:-
这个例子是用 python (pySpark) 写的
df = sqlContext.createDataFrame([('1','Name_1','Address_1'),('2','Name_2','Address_2'),('3','Name_3','Address_3')], schema=['ID', 'Name', 'Address'])
delta_tblNm = 'testDeltaSchema.test_delta_tbl'
parquet_tblNm = 'testParquetSchema.test_parquet_tbl'
delta_write_loc = 'dbfs:///mnt/datalake/stg/delta_tblNm'
parquet_write_loc = 'dbfs:///mnt/datalake/stg/parquet_tblNm'
# DELTA TABLE
df.write.format('delta').mode('overwrite').option('overwriteSchema', 'true').save(delta_write_loc)
spark.sql(" create table if not exists {} using DELTA LOCATION '{}'".format(delta_tblNm, delta_write_loc))
spark.sql("refresh table {}".format(print(cur_tblNm)))
# PARQUET TABLE
df.write.format("parquet").mode("overwrite").save(parquet_write_loc)
spark.sql("""CREATE TABLE if not exists {} USING PARQUET LOCATION '{}'""".format(parquet_tblNm, parquet_write_loc))
spark.sql(""" REFRESH TABLE {} """.format(parquet_tblNm))
test_df = spark.sql("select * testDeltaSchema.test_delta_tbl")
test_df.show()
test_df = spark.sql("select * from testParquetSchema.test_parquet_tbl")
test_df.show()
test_df = spark.sql("ALTER TABLE testDeltaSchema.test_delta_tbl ADD COLUMNS (Mob_number String COMMENT 'newCol' AFTER Address)")
test_df.show()
test_df = spark.sql("ALTER TABLE testParquetSchema.test_parquet_tbl ADD COLUMNS (Mob_number String COMMENT 'newCol' AFTER Address)")
test_df.show()
可以这样做,但这取决于你是否要回填数据。 Spark 3.5.0、Delta Lake 3.2.0、Python 3.10 的快速示例。 模式演变的前进基础:
from pyspark.sql.functions import lit, when
parquet_path = "s3://my_bucket/test.parquet"
df = spark.range(30)
df.write.mode("overwrite").parquet(parquet_path)
df.withColumn("new_col", lit("new_value")).write.mode("append").parquet(parquet_path)
spark.range(30).write.mode("append").parquet(parquet_path)
spark.read.parquet(parquet_path).filter("new_col IS NULL").count()
如果您想回填旧的列值:
spark.read.parquet(parquet_path)
.withColumn(
"new_col",
when(
col("new_col").isNull(),
..., # Your substitution logic here
).otherwise(col("new_col")
).write.mode("overwrite").parquet(other_path)