azure databricks 计算所有表中的行数 - 有没有更好的方法

问题描述 投票:0回答:3

我正在尝试找到获取所有数据块表行数的最佳方法。 这就是我想到的:

for row in dvdbs.rdd.collect():
   tmp = "show tables from " + row['databaseName'] + " like 'xxx*'"
   if row['databaseName'] == 'default':
       dftbls = sqlContext.sql(tmp)
   else:
     dftbls = dftbls.union(sqlContext.sql(tmp))
tmplist = []
for row in dftbls.rdd.collect():
    tmp = 'select * from ' + row['database'] + '.' + row['tableName']
    tmpdf = sqlContext.sql(tmp)
    tmplist.append((row['database'], row['tableName'],tmpdf.count()))
columns =  ['database', 'tableName', 'rowCount']     
df = spark.createDataFrame(tmplist, columns)    
display(df)
azure-databricks
3个回答
2
投票

我发现这明显更快......

dftbl = sqlContext.sql("show tables")
dfdbs = sqlContext.sql("show databases")
for row in dfdbs.rdd.collect():
    tmp = "show tables from " + row['databaseName'] 
    if row['databaseName'] == 'default':
        dftbls = sqlContext.sql(tmp)
    else:
       dftbls = dftbls.union(sqlContext.sql(tmp))
tmplist = []
for row in dftbls.rdd.collect():
    try:
      tmp = 'select count(*) myrowcnt from ' + row['database'] + '.' + row['tableName']
      tmpdf = sqlContext.sql(tmp)
      myrowcnt= tmpdf.collect()[0]['myrowcnt'] 
      tmplist.append((row['database'], row['tableName'],myrowcnt))
    except:
      tmplist.append((row['database'], row['tableName'],-1))

columns =  ['database', 'tableName', 'rowCount']     
df = spark.createDataFrame(tmplist, columns)    
display(df)

0
投票

您也可以尝试使用这个:-

def fn_byDBgetCount():
  final_list = []
  dbList = spark.sql("show databases").select("namespace").rdd.flatMap(lambda x: x).collect()
  for databaseName in dbList:
    spark.sql("use {}".format(databaseName))
    tableList = spark.sql("show tables from {}".format(databaseName)).select("tableName").rdd.flatMap(lambda x: x).collect()
    for tableName in tableList:
      tableCount = spark.sql("select count(*) as tableCount from {}".format(tableName)).collect()[0][0]
      final_list.append(list([databaseName,tableName,tableCount]))
  column_names = list(['DatabaseName','TableName','TableCount'])
  df = spark.createDataFrame(final_list,column_names)
  display(df)

fn_byDBgetCount()    


0
投票

只需在 list_tables 列表中传递增量表列表并获取每个表的计数:

from pyspark.sql.types import StructType,StructField
from pyspark.sql.functions import StringType

schema = StructType([
StructField('table_name',StringType()),
StructField('row_cnt',StringType()),
])
final_df = spark.createDataFrame([],schema)

list_tables =     ['schema_name.table1','schema_name.table2']
bd = '2023-12-02'

print(list_tables)
print(bd)

for i in list_tables:
    query= 'select "{}" as table_name,count(*) as row_cnt from {} where date_of_trans="{}" '.format(i,i,bd)
    df1 = spark.sql(query)
    final_df = final_df.union(df1)

display(final_df)
© www.soinside.com 2019 - 2024. All rights reserved.