假设我们有这样简单的查询:
Select a.col1, b.col2 from tb1 as a inner join tb2 as b on tb1.col7 = tb2.col8;
结果应该是这样的:
tb1 col1
tb1 col7
tb2 col2
tb2 col8
我尝试使用一些Python库来解决这个问题:
1)即使使用
sqlparse
仅提取表格也可能是一个大问题。例如this官方书籍根本无法正常使用。
2)使用正则表达式似乎很难实现。
3)但后来我发现了this,这可能会有所帮助。然而问题是我无法连接到任何数据库并执行该查询。
有什么想法吗?
sql-metadata 是一个 Python 库,它使用 python-sqlparse 返回的标记化查询并生成查询元数据。
此元数据可以从您提供的 SQL 查询中返回列名和表名。以下是 sql-metadata github 自述文件中的几个示例:
>>> sql_metadata.get_query_columns("SELECT test, id FROM foo, bar")
[u'test', u'id']
>>> sql_metadata.get_query_tables("SELECT test, id FROM foo, bar")
[u'foo', u'bar']
>>> sql_metadata.get_query_limit_and_offset('SELECT foo_limit FROM bar_offset LIMIT 50 OFFSET 1000')
(50, 1000)
该库的托管版本位于 sql-app.infocruncher.com,看看它是否适合您。
确实,这不是一件容易的事。您可以使用词法分析器(本例中为ply)并定义多个规则来从字符串中获取多个标记。以下代码为 SQL 字符串的不同部分定义了这些规则,并将它们重新组合在一起,因为输入字符串中可能存在别名。结果,您得到一个以不同表名作为键的字典(
result
)。
import ply.lex as lex, re
tokens = (
"TABLE",
"JOIN",
"COLUMN",
"TRASH"
)
tables = {"tables": {}, "alias": {}}
columns = []
t_TRASH = r"Select|on|=|;|\s+|,|\t|\r"
def t_TABLE(t):
r"from\s(\w+)\sas\s(\w+)"
regex = re.compile(t_TABLE.__doc__)
m = regex.search(t.value)
if m is not None:
tbl = m.group(1)
alias = m.group(2)
tables["tables"][tbl] = ""
tables["alias"][alias] = tbl
return t
def t_JOIN(t):
r"inner\s+join\s+(\w+)\s+as\s+(\w+)"
regex = re.compile(t_JOIN.__doc__)
m = regex.search(t.value)
if m is not None:
tbl = m.group(1)
alias = m.group(2)
tables["tables"][tbl] = ""
tables["alias"][alias] = tbl
return t
def t_COLUMN(t):
r"(\w+\.\w+)"
regex = re.compile(t_COLUMN.__doc__)
m = regex.search(t.value)
if m is not None:
t.value = m.group(1)
columns.append(t.value)
return t
def t_error(t):
raise TypeError("Unknown text '%s'" % (t.value,))
t.lexer.skip(len(t.value))
# here is where the magic starts
def mylex(inp):
lexer = lex.lex()
lexer.input(inp)
for token in lexer:
pass
result = {}
for col in columns:
tbl, c = col.split('.')
if tbl in tables["alias"].keys():
key = tables["alias"][tbl]
else:
key = tbl
if key in result:
result[key].append(c)
else:
result[key] = list()
result[key].append(c)
print result
# {'tb1': ['col1', 'col7'], 'tb2': ['col2', 'col8']}
string = "Select a.col1, b.col2 from tb1 as a inner join tb2 as b on tb1.col7 = tb2.col8;"
mylex(string)
moz-sql-parser 是一个 Python 库,用于将 SQL-92 查询的某些子集转换为可 JSON 的解析树。也许这就是你想要的。
这是一个例子。
>>> parse("SELECT id,name FROM dual WHERE id>3 and id<10 ORDER BY name")
{'select': [{'value': 'id'}, {'value': 'name'}], 'from': 'dual', 'where': {'and': [{'gt': ['id', 3]}, {'lt': ['id', 10]}]}, 'orderby': {'value': 'name'}}
我正在解决类似的问题,并找到了一个更简单的解决方案,而且似乎效果很好。
import re
def tables_in_query(sql_str):
# remove the /* */ comments
q = re.sub(r"/\*[^*]*\*+(?:[^*/][^*]*\*+)*/", "", sql_str)
# remove whole line -- and # comments
lines = [line for line in q.splitlines() if not re.match("^\s*(--|#)", line)]
# remove trailing -- and # comments
q = " ".join([re.split("--|#", line)[0] for line in lines])
# split on blanks, parens and semicolons
tokens = re.split(r"[\s)(;]+", q)
# scan the tokens. if we see a FROM or JOIN, we set the get_next
# flag, and grab the next one (unless it's SELECT).
tables = set()
get_next = False
for tok in tokens:
if get_next:
if tok.lower() not in ["", "select"]:
tables.add(tok)
get_next = False
get_next = tok.lower() in ["from", "join"]
dictTables = dict()
for table in tables:
fields = []
for token in tokens:
if token.startswith(table):
if token != table:
fields.append(token)
if len(list(set(fields))) >= 1:
dictTables[table] = list(set(fields))
return dictTables
代码改编自https://grisha.org/blog/2016/11/14/table-names-from-sql/
创建数据库中存在的所有表的列表。然后,您可以在查询中搜索每个表名称。 这显然不是万无一失的,如果任何列/别名与表名匹配,代码就会中断。 但这可以作为一种解决方法来完成。
import pandas as pd
#%config PPMagics.autolimit=0
#txt = """<your SQL text here>"""
txt_1 = txt
replace_list = ['\n', '(', ')', '*', '=','-',';','/','.']
count = 0
for i in replace_list:
txt_1 = txt_1.replace(i, ' ')
txt_1 = txt_1.split()
res = []
for i in range(1, len(txt_1)):
if txt_1[i-1].lower() in ['from', 'join','table'] and txt_1[i].lower() != 'select':
count +=1
str_count = str(count)
res.append(txt_1[i] + "." + txt_1[i+1])
#df.head()
res_l = res
f_res_l = []
for i in range(0,len(res_l)):
if len(res_l[i]) > 15 : # change it to 0 is you want all the caught strings
f_res_l.append(res_l[i])
else :
pass
All_Table_List = f_res_l
print("All the unique tables from the SQL text, in the order of their appearence in the code : \n",100*'*')
df = pd.DataFrame(All_Table_List,columns=['Tables_Names'])
df.reset_index(level=0, inplace=True)
list_=list(df["Tables_Names"].unique())
df_1_Final = pd.DataFrame(list_,columns=['Tables_Names'])
df_1_Final.reset_index(level=0, inplace=True)
df_1_Final
不幸的是,为了成功地完成“复杂 SQL”查询,您或多或少必须为您正在使用的特定数据库引擎实现完整的解析器。
举个例子,考虑这个非常基本的复杂查询:
WITH a AS (
SELECT col1 AS c FROM b
)
SELECT c FROM a
在本例中,
a
不是表,而是公用表表达式 (CTE),应从输出中排除。使用 regexp:es 没有简单的方法来认识到 b
是表访问,但 a
不是 - 您的代码确实必须更深入地理解 SQL。
也可以考虑
SELECT * FROM tbl
您必须知道数据库特定实例中实际存在的列名称(并且特定用户也可以访问)才能正确回答该问题。
如果“适用于复杂 SQL”,您的意思是它必须适用于任何有效的 SQL 语句,您还需要指定适用于哪种 SQL 方言 - 或实施特定于方言的解决方案。一种适用于未实现 CTE:s 的数据库处理的任何 SQL 的解决方案将不适用于已实现 CTE:s 的数据库。
很抱歉这么说,但我认为您不会找到适用于任意复杂 SQL 查询的完整解决方案。您必须选择一个适用于特定 SQL 方言子集的解决方案。
对于我的简单用例(查询中的一个表,没有联接),我使用了以下调整
lst = "select * from table".split(" ")
lst = [item for item in lst if len(item)>0]
table_name = lst[lst.index("from")+1]
您可以使用此网站从任何 SQL 查询中单独提取表和 CTE。