pySpark with Iceberg 保存多个小文件

问题描述 投票:0回答:1

我有一个在 Glue 上运行的 pyspark 作业。我的工作处理数据并将其保存为 Apache Iceberg。问题是,保存表在分区内生成多个小文件。我测试了几种保存数据的方法,但没有解决。这是我截取的代码。

import pyspark.sql.functions as f
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import DataFrame

conf = (
    SparkConf()
    .setAppName(APP_NAME)
    .set(
        "spark.sql.extensions",
        "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
    )
    .set("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
    .set("spark.sql.catalog.glue_catalog.warehouse", BRONZE_PATH)
    .set(
        "spark.sql.catalog.glue_catalog.catalog-impl",
        "org.apache.iceberg.aws.glue.GlueCatalog",
    )
    .set("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    .set("spark.sql.shuffle.partitions", "100")
)
sc = SparkContext(conf=conf)
glueContext = GlueContext(sc)
glue_db = glueContext.create_dynamic_frame.from_catalog(database=DATABASE_NAME, table_name=LANDING_TABLE_NAME)
df = glue_db.toDF()
df.createOrReplaceTempView(APP_NAME)

# do all processing here...

df = df.sortWithinPartitions("issueid")
(
    df.writeTo(f"glue_catalog.bronze_{ENVIRONMENT}.{BRONZE_TABLE_NAME}")
    .using("iceberg")
    .tableProperty("format-version", "2")
    .tableProperty("location", BRONZE_PATH + BRONZE_TABLE_OUTPUT)
    .tableProperty("write.distribution.mode", "hash")
    .tableProperty("write.target-file-size-bytes", "536870912")
    .partitionedBy("issueid")
    .createOrReplace()
)

我的输出如下所示:

enter image description here

所需的输出我想要:每个分区一个压缩文件。

我怎样才能实现这个目标?

pyspark aws-glue apache-iceberg
1个回答
0
投票

重新分区对我有用:

df = df.repartition("issueid").sortWithinPartitions("issueid")

不确定这是否是最好的解决方案。请随时改进。

© www.soinside.com 2019 - 2024. All rights reserved.