将新列附加到现有镶木地板文件

问题描述 投票:0回答:4

有什么方法可以将新列附加到现有的镶木地板文件中吗?

我目前正在参加 Kaggle 比赛,我已将所有数据转换为 Parquet 文件。

在这种情况下,我将 parquet 文件读入 pyspark DataFrame,进行了一些特征提取,并使用

将新列附加到 DataFrame

pysaprk.DataFrame.withColumn()。

之后,我想将新列保存在源镶木地板文件中。

我知道 Spark SQL 带有 Parquet 模式演化,但该示例仅显示了带有键值的情况。

镶木地板“追加”模式也不起作用。它仅将新行追加到镶木地板文件中。 是否可以将新列附加到现有镶木地板文件而不是再次生成整个表? 或者我必须生成一个单独的新镶木地板文件并在运行时加入它们。

apache-spark apache-spark-sql parquet
4个回答
8
投票

虽然这个问题已经发布2年了,但仍然没有答案,让我自己回答我自己的问题。

我还在使用Spark的时候,Spark的版本是1.4。我不喜欢新版本,但对于该版本,向镶木地板文件添加新列是不可能的。


7
投票

在 parquet 中,您不修改文件,而是读取它们、修改它们并将它们写回,您不能只更改需要读取和写入完整文件的列。


4
投票

是的,Databricks Delta 以及parquet 桌子都可以。下面给出了一个例子:-

这个例子是用 python (pySpark) 写的

df = sqlContext.createDataFrame([('1','Name_1','Address_1'),('2','Name_2','Address_2'),('3','Name_3','Address_3')], schema=['ID', 'Name', 'Address'])

delta_tblNm = 'testDeltaSchema.test_delta_tbl'
parquet_tblNm = 'testParquetSchema.test_parquet_tbl'

delta_write_loc = 'dbfs:///mnt/datalake/stg/delta_tblNm'
parquet_write_loc = 'dbfs:///mnt/datalake/stg/parquet_tblNm'


# DELTA TABLE
df.write.format('delta').mode('overwrite').option('overwriteSchema', 'true').save(delta_write_loc)
spark.sql(" create table if not exists {} using DELTA LOCATION '{}'".format(delta_tblNm, delta_write_loc))
spark.sql("refresh table {}".format(print(cur_tblNm)))

# PARQUET TABLE
df.write.format("parquet").mode("overwrite").save(parquet_write_loc)
spark.sql("""CREATE TABLE if not exists {} USING PARQUET LOCATION '{}'""".format(parquet_tblNm, parquet_write_loc))
spark.sql(""" REFRESH TABLE {} """.format(parquet_tblNm))

test_df = spark.sql("select * testDeltaSchema.test_delta_tbl")
test_df.show()

test_df = spark.sql("select * from testParquetSchema.test_parquet_tbl")
test_df.show()

test_df = spark.sql("ALTER TABLE  testDeltaSchema.test_delta_tbl ADD COLUMNS (Mob_number String COMMENT 'newCol' AFTER Address)")
test_df.show()

test_df = spark.sql("ALTER TABLE  testParquetSchema.test_parquet_tbl ADD COLUMNS (Mob_number String COMMENT 'newCol' AFTER Address)")
test_df.show()

0
投票

可以这样做,但这取决于你是否要回填数据。 Spark 3.5.0、Delta Lake 3.2.0、Python 3.10 的快速示例。 模式演变的前进基础:

from pyspark.sql.functions import lit, when
parquet_path = "s3://my_bucket/test.parquet"
df = spark.range(30)
df.write.mode("overwrite").parquet(parquet_path)
df.withColumn("new_col", lit("new_value")).write.mode("append").parquet(parquet_path)
spark.range(30).write.mode("append").parquet(parquet_path)
spark.read.parquet(parquet_path).filter("new_col IS NULL").count()

如果您想回填旧的列值:

spark.read.parquet(parquet_path)
.withColumn(
   "new_col",
   when(
       col("new_col").isNull(), 
       ..., # Your substitution logic here
   ).otherwise(col("new_col")
).write.mode("overwrite").parquet(other_path)
© www.soinside.com 2019 - 2024. All rights reserved.