我正在尝试找到获取所有数据块表行数的最佳方法。 这就是我想到的:
for row in dvdbs.rdd.collect():
tmp = "show tables from " + row['databaseName'] + " like 'xxx*'"
if row['databaseName'] == 'default':
dftbls = sqlContext.sql(tmp)
else:
dftbls = dftbls.union(sqlContext.sql(tmp))
tmplist = []
for row in dftbls.rdd.collect():
tmp = 'select * from ' + row['database'] + '.' + row['tableName']
tmpdf = sqlContext.sql(tmp)
tmplist.append((row['database'], row['tableName'],tmpdf.count()))
columns = ['database', 'tableName', 'rowCount']
df = spark.createDataFrame(tmplist, columns)
display(df)
我发现这明显更快......
dftbl = sqlContext.sql("show tables")
dfdbs = sqlContext.sql("show databases")
for row in dfdbs.rdd.collect():
tmp = "show tables from " + row['databaseName']
if row['databaseName'] == 'default':
dftbls = sqlContext.sql(tmp)
else:
dftbls = dftbls.union(sqlContext.sql(tmp))
tmplist = []
for row in dftbls.rdd.collect():
try:
tmp = 'select count(*) myrowcnt from ' + row['database'] + '.' + row['tableName']
tmpdf = sqlContext.sql(tmp)
myrowcnt= tmpdf.collect()[0]['myrowcnt']
tmplist.append((row['database'], row['tableName'],myrowcnt))
except:
tmplist.append((row['database'], row['tableName'],-1))
columns = ['database', 'tableName', 'rowCount']
df = spark.createDataFrame(tmplist, columns)
display(df)
您也可以尝试使用这个:-
def fn_byDBgetCount():
final_list = []
dbList = spark.sql("show databases").select("namespace").rdd.flatMap(lambda x: x).collect()
for databaseName in dbList:
spark.sql("use {}".format(databaseName))
tableList = spark.sql("show tables from {}".format(databaseName)).select("tableName").rdd.flatMap(lambda x: x).collect()
for tableName in tableList:
tableCount = spark.sql("select count(*) as tableCount from {}".format(tableName)).collect()[0][0]
final_list.append(list([databaseName,tableName,tableCount]))
column_names = list(['DatabaseName','TableName','TableCount'])
df = spark.createDataFrame(final_list,column_names)
display(df)
fn_byDBgetCount()
只需在 list_tables 列表中传递增量表列表并获取每个表的计数:
from pyspark.sql.types import StructType,StructField
from pyspark.sql.functions import StringType
schema = StructType([
StructField('table_name',StringType()),
StructField('row_cnt',StringType()),
])
final_df = spark.createDataFrame([],schema)
list_tables = ['schema_name.table1','schema_name.table2']
bd = '2023-12-02'
print(list_tables)
print(bd)
for i in list_tables:
query= 'select "{}" as table_name,count(*) as row_cnt from {} where date_of_trans="{}" '.format(i,i,bd)
df1 = spark.sql(query)
final_df = final_df.union(df1)
display(final_df)