微软文档在这里: https://learn.microsoft.com/en-us/azure/databricks/kb/sql/find-size-of-table#size-of-a-delta-table 建议两种方法:
方法一:
import com.databricks.sql.transaction.tahoe._
val deltaLog = DeltaLog.forTable(spark, "dbfs:/<path-to-delta-table>")
val snapshot = deltaLog.snapshot // the current delta table snapshot
println(s"Total file size (bytes): ${deltaLog.snapshot.sizeInBytes}")`
方法二:
spark.read.table("<non-delta-table-name>").queryExecution.analyzed.stats
对于我的表,它们都返回 ~300 MB。
但是在存储资源管理器文件夹统计信息或递归 dbutils.fs.ls 中,我得到了 ~900MB。
因此,这两种方法比逐字查看每个文件要快得多,漏报率低了 67%。 使用较慢的方法就可以了,除非我尝试扩展到整个容器,扫描所有 10 亿个文件和 2.6 PB 需要 55 小时。
那么在 ADLS Gen 2 中获取表大小的最佳方法是什么? 如果它适用于不是表格的文件夹,那就加分了,因为这确实是我需要的数字。 dbutils.fs.ls 是单线程的,仅适用于驱动程序,因此它甚至不是非常可并行的。 它可以是螺纹的,但只能在驱动程序内。
deltaLog.snapshot
仅返回当前快照。您可以在表的目录中存在更多文件,这些文件属于已从当前快照中删除/替换的历史版本。
它还返回 0,而不会抱怨非增量路径。所以我使用这段代码来获取数据库级别的摘要:
import com.databricks.sql.transaction.tahoe._
val databasePath = "dbfs:/<path-to-database>"
def size(path: String): Long =
dbutils.fs.ls(path).map { fi => if (fi.isDir) size(fi.path) else fi.size }.sum
val tables = dbutils.fs.ls(databasePath).par.map { fi =>
val totalSize = size(fi.path)
val snapshotSize = DeltaLog.forTable(spark, fi.path).snapshot.sizeInBytes
(fi.name, totalSize / 1024 / 1024 / 1024, snapshotSize / 1024 / 1024 / 1024)
}
display(tables.seq.sorted.toDF("name", "total_size_gb", "snapshot_size_gb"))
这仅在驱动程序上并行化,但它仍然只是文件列表,因此速度相当快。我承认我没有十亿个文件,但是好吧,如果它对你来说很慢,只需使用更大的驱动程序并调整线程数。
要获取当前版本表格的大小,可以使用
byte_size = spark.sql("describe detail customers").select("sizeInBytes").collect()
byte_size = (byte_size[0]["sizeInBytes"])
kb_size = byte_size/1024
mb_size = kb_size/1024
tb_size = mb_size/1024
print(f"Current table snapshot size is {byte_size}bytes or {kb_size}KB or {mb_size}MB or {tb_size}TB")
要获取表的大小,包括所有尚未清理的历史文件/版本,您可以尝试执行以下命令并参考统计行
DESCRIBE EXTENDED customers
因此,要提取表格大小,您可以使用:
spark.sql("DESCRIBE EXTENDED customers").createOrReplaceTempView("stats_table")
stats = spark.sql("select data_type from stats_table where col_name = 'Statistics'").collect()
print(stats[0]["data_type"])
如果您手动迭代所有文件,这将给出相同的值。上面的代码片段应该比遍历 ADLS 中的所有文件和子目录要快得多。您所需要做的就是输入表格列表。您甚至可以使用下面的代码片段提取表格列表。
#get list of all tables
catalog_name = "spark_catalog"
schema_list = spark.sql(f"show schemas in {catalog_name}").select('databaseName').rdd.map(lambda x : x[0]).collect()
for database_name in schema_list:
table_list = spark.sql(f"show tables from {database_name}").select('tableName').rdd.map(lambda x : x[0]).collect()
for table in table_list:
print(f"{database_name}.{table}")
你能用你的大桌子尝试一下并分享性能吗?
%python
spark.sql("describe detail schema.delta_table_name").select("sizeInBytes").collect()