我有一个在 Glue 上运行的 pyspark 作业。我的工作处理数据并将其保存为 Apache Iceberg。问题是,保存表在分区内生成多个小文件。我测试了几种保存数据的方法,但没有解决。这是我截取的代码。
import pyspark.sql.functions as f
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import DataFrame
conf = (
SparkConf()
.setAppName(APP_NAME)
.set(
"spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
)
.set("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
.set("spark.sql.catalog.glue_catalog.warehouse", BRONZE_PATH)
.set(
"spark.sql.catalog.glue_catalog.catalog-impl",
"org.apache.iceberg.aws.glue.GlueCatalog",
)
.set("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
.set("spark.sql.shuffle.partitions", "100")
)
sc = SparkContext(conf=conf)
glueContext = GlueContext(sc)
glue_db = glueContext.create_dynamic_frame.from_catalog(database=DATABASE_NAME, table_name=LANDING_TABLE_NAME)
df = glue_db.toDF()
df.createOrReplaceTempView(APP_NAME)
# do all processing here...
df = df.sortWithinPartitions("issueid")
(
df.writeTo(f"glue_catalog.bronze_{ENVIRONMENT}.{BRONZE_TABLE_NAME}")
.using("iceberg")
.tableProperty("format-version", "2")
.tableProperty("location", BRONZE_PATH + BRONZE_TABLE_OUTPUT)
.tableProperty("write.distribution.mode", "hash")
.tableProperty("write.target-file-size-bytes", "536870912")
.partitionedBy("issueid")
.createOrReplace()
)
我的输出如下所示:
所需的输出我想要:每个分区一个压缩文件。
我怎样才能实现这个目标?
做重新分区对我有用:
df = df.repartition("issueid").sortWithinPartitions("issueid")
不确定这是否是最好的解决方案。请随时改进。