我正在开发一个遵循 databricks 中动态 SQL 逻辑的想法。目的是节省体力劳动。
我有一个表,其中存储了我将在动态 SQL 查询中使用的所有参数。然后我使用这些参数使用串联来生成 SQL 查询。
该表称为“DQ_FOM_SOURCE_SETUP”,它包含以下列和行:
我写的动态SQL查询是:
-- 2. Dynamic SQL query for Source 2
SELECT
'SELECT ' ||
TRIM(SOURCE_TABLE_GROUP_2) || ', ' ||
"'" || TRIM(SOURCE_COLUMN_2) || "' AS Metric, " ||
"'" || TRIM(SOURCE_TABLE_2) || "' AS TABLE_2_Name, " ||
'SUM(' || TRIM(SOURCE_COLUMN_2) || ') AS SUM_Value_2 ' ||
'FROM ' || TRIM(SOURCE_TABLE_2) || ' ' ||
'GROUP BY ' || TRIM(SOURCE_TABLE_GROUP_2) AS sql_statement
FROM DQ_FOM_SOURCE_SETUP;
现在,下一步是合并结果中的所有查询。我尝试了多种方法,例如我尝试过:
SELECT CONCAT(dynamic_sql, ' UNION ALL ') AS full_sql
FROM (
SELECT
'SELECT ' ||
TRIM(SOURCE_TABLE_GROUP_2) || ' AS ' || TRIM(SOURCE_COLUMN_ALIAS_2) || ', ' ||
'SUM(' || TRIM(SOURCE_COLUMN_2) || ') AS SUM_' || TRIM(SOURCE_COLUMN_2) || ' FROM ' ||
TRIM(SOURCE_TABLE_2) || ' GROUP BY ' || TRIM(SOURCE_TABLE_GROUP_2) AS dynamic_sql
FROM DQ_FOM_SOURCE_SETUP
) AS subquery;
但是它返回的仍然是一个查询列表,而不是一个联合所有子查询的大查询。
你知道如何合并结果表中的所有查询吗?我想找到一种有效的方法来联合它们,因为将来 DQ_FOM_SOURCE_SETUP 表中可能有数百行,我无法手动编写联合语句,这就是我想使用动态 SQL 的原因。非常感谢你的帮助。
如果没问题,您可以使用 Pyspark 和 databricks SQL 的组合来实现此目的。
如果您只想获取实际查询,您可以迭代行并继续附加语句和 union 关键字。 但是,我的答案不仅会为您提供实际的最终查询,还会执行查询并为您提供结果。
编写一个 pyspark 函数,在其中传递 sql 语句并使用 df = Spark.sql(
现在在函数之外,您可以从临时视图中进行选择,并且您可以获得数据。
首先,我正在创建示例输入表
%sql
CREATE OR REPLACE TABLE table1 (
id INT,
name VARCHAR(50)
);
INSERT INTO table1 VALUES
(1, 'John'),
(2, 'Jane'),
(3, 'Bob');
CREATE OR REPLACE TABLE sqlstmts (
sqlstmt VARCHAR(50)
);
INSERT INTO sqlstmts VALUES
('SELECT * FROM table1 WHERE ID = 1'),
('SELECT * FROM table1 WHERE ID = 2');
接下来我执行我的函数。获取 SQL 语句后,我将其转换为 pandas 数据帧以对其进行迭代。
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
#Create an empty temp view with matching schema as required output table
schema = StructType([
StructField("id", IntegerType(), True), StructField("name", StringType(), False)
])
tempdf = spark.createDataFrame([], schema)
tempdf.createOrReplaceTempView("temp_view")
def union_func(sql_stmt,view_name):
df = spark.sql(sql_stmt)
df.createOrReplaceTempView("temp_view2")
uniondf = spark.sql(f"SELECT * FROM temp_view2 UNION SELECT * FROM {view_name}")
uniondf.createOrReplaceTempView(view_name)
# Get your SQL statements
spark_df = spark.sql("SELECT * from sqlstmts;")
pandas_df = spark_df.toPandas()
final_union_stmt = ""
print("The SQL Statements are as follows---")
print("-------------------------------------------------")
for index, row in pandas_df.iterrows():
if final_union_stmt != "":
final_union_stmt = final_union_stmt + " union "
print(row['sqlstmt'])
union_func(row['sqlstmt'],"temp_view")
final_union_stmt = final_union_stmt + row['sqlstmt']
print(f"Final Union statement is :{final_union_stmt};")
最后我从输出视图中获取结果
%sql
SELECT * FROM temp_view;
解决此问题的另一种方法是使用循环形成实际的 sql 语句,然后一次性执行它。如果只需要sql union语句,可以去掉sql语句的执行。