我在基于 HDFS 的数据湖上使用 PySpark 和 Apache Iceberg,并且遇到了严重的存储问题。我的应用程序每秒都会摄取实时数据。大约 2 小时后,我收到一条错误消息,指示存储已满。在研究 HDFS 文件夹(同时存储数据和元数据)时,我注意到与实际数据相比,Iceberg 的元数据消耗了惊人的大量存储空间。
这是我的 Spark 配置:
CONF = (
pyspark.SparkConf()
.set("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0")
.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
.set("spark.sql.catalog.spark_catalog.type", "hive")
.set("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
.set("spark.sql.catalog.local.type", "hadoop")
.set("spark.sql.catalog.local.warehouse", 'warehouse')
.set("spark.sql.defaultCatalog", "local")
.set("spark.driver.port", "7070")
.set("spark.ui.port", "4040")
)
我的建表代码:
def get_create_channel_data_table_query() -> str:
return f"""
CREATE TABLE IF NOT EXISTS local.db.{CHANNEL_DATA_TABLE} (
uuid STRING,
channel_set_channel_uuid STRING,
data FLOAT,
file_uri STRING,
index_depth FLOAT,
index_time BIGINT,
creation TIMESTAMP,
last_update TIMESTAMP
)
"""
向表中插入数据:
def insert_row_into_table(spark: SparkSession, schema: StructType, table_name: str, row: tuple):
df = spark.createDataFrame([row], schema)
df.writeTo(f"local.db.{table_name}").append()
问题:
Iceberg 的元数据增长迅速,相对于数据占用了大量的空间。我不确定为什么元数据消耗如此之高。
问题:
是什么导致元数据消耗如此多的空间?我可以应用哪些最佳实践或配置来减少元数据存储?
扩展我上面的评论,来自 Iceberg 文档:
对 Iceberg 表的每次写入都会创建一个新的快照或版本 一张桌子。快照可用于时间旅行查询,或者表 可以回滚到任何有效的快照。
因此,对于包含大量分区/文件的表,冰山清单的总大小会增加得相当快,特别是如果您每秒进行写入/提交,我不会感到惊讶。
您可以按照上述文档减少存储版本的数量,并继续清除旧元数据并删除孤立文件。
但我仍然认为 Iceberg 不是适合秒级流数据的存储层,其他存储机制(HBase、Kudu、Solr(?))会更合适。