Databricks:动态 SQL |如何合并查询列表中的所有查询?

问题描述 投票:0回答:1

我正在开发一个遵循 databricks 中动态 SQL 逻辑的想法。目的是节省体力劳动。

我有一个表,其中存储了我将在动态 SQL 查询中使用的所有参数。然后我使用这些参数使用串联来生成 SQL 查询。

该表称为“DQ_FOM_SOURCE_SETUP”,它包含以下列和行: enter image description here

我写的动态SQL查询是:

-- 2. Dynamic SQL query for Source 2
SELECT 
'SELECT ' || 
TRIM(SOURCE_TABLE_GROUP_2) || ', ' ||
"'" || TRIM(SOURCE_COLUMN_2) || "' AS Metric, " ||
"'" || TRIM(SOURCE_TABLE_2) || "' AS TABLE_2_Name, " ||
'SUM(' || TRIM(SOURCE_COLUMN_2) || ') AS SUM_Value_2 ' ||
'FROM ' || TRIM(SOURCE_TABLE_2) || ' ' ||
'GROUP BY ' || TRIM(SOURCE_TABLE_GROUP_2) AS sql_statement
FROM DQ_FOM_SOURCE_SETUP;

结果如下所示: enter image description here

现在,下一步是合并结果中的所有查询。我尝试了多种方法,例如我尝试过:

SELECT CONCAT(dynamic_sql, ' UNION ALL ') AS full_sql
FROM (
SELECT 
    'SELECT ' || 
    TRIM(SOURCE_TABLE_GROUP_2) || ' AS ' || TRIM(SOURCE_COLUMN_ALIAS_2) || ', ' ||
    'SUM(' || TRIM(SOURCE_COLUMN_2) || ') AS SUM_' || TRIM(SOURCE_COLUMN_2) || ' FROM ' ||
    TRIM(SOURCE_TABLE_2) || ' GROUP BY ' || TRIM(SOURCE_TABLE_GROUP_2) AS dynamic_sql
FROM DQ_FOM_SOURCE_SETUP
) AS subquery;

但是它返回的仍然是一个查询列表,而不是一个联合所有子查询的大查询。

你知道如何合并结果表中的所有查询吗?我想找到一种有效的方法来联合它们,因为将来 DQ_FOM_SOURCE_SETUP 表中可能有数百行,我无法手动编写联合语句,这就是我想使用动态 SQL 的原因。非常感谢你的帮助。 enter image description here

sql database databricks azure-databricks databricks-sql
1个回答
0
投票

如果没问题,您可以使用 Pyspark 和 databricks SQL 的组合来实现此目的。

如果您只想获取实际查询,您可以迭代行并继续附加语句和 union 关键字。 但是,我的答案不仅会为您提供实际的最终查询,还会执行查询并为您提供结果。

编写一个 pyspark 函数,在其中传递 sql 语句并使用 df = Spark.sql( 在函数中执行,然后将 df 存储为临时视图。 继续为每个语句传递回视图,并让函数进行并集。

现在在函数之外,您可以从临时视图中进行选择,并且您可以获得数据。

首先,我正在创建示例输入表

%sql
CREATE OR REPLACE TABLE table1 (
    id INT,
    name VARCHAR(50)
);

INSERT INTO table1 VALUES
(1, 'John'),
(2, 'Jane'),
(3, 'Bob');

CREATE OR REPLACE TABLE sqlstmts (
    sqlstmt VARCHAR(50)
);

INSERT INTO sqlstmts VALUES
('SELECT * FROM table1 WHERE ID = 1'),
('SELECT * FROM table1 WHERE ID = 2');

接下来我执行我的函数。获取 SQL 语句后,我将其转换为 pandas 数据帧以对其进行迭代。

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

#Create an empty temp view with matching schema as required output table
schema = StructType([
    StructField("id", IntegerType(), True), StructField("name", StringType(), False)
])
tempdf = spark.createDataFrame([], schema)
tempdf.createOrReplaceTempView("temp_view")

def union_func(sql_stmt,view_name):
    df = spark.sql(sql_stmt)
    df.createOrReplaceTempView("temp_view2")
    uniondf = spark.sql(f"SELECT * FROM temp_view2 UNION SELECT * FROM {view_name}")
    uniondf.createOrReplaceTempView(view_name)

# Get your SQL statements
spark_df = spark.sql("SELECT * from sqlstmts;")
pandas_df = spark_df.toPandas()
final_union_stmt = ""

print("The SQL Statements are as follows---")
print("-------------------------------------------------")
for index, row in pandas_df.iterrows():
    if final_union_stmt != "":
        final_union_stmt = final_union_stmt + " union "
    print(row['sqlstmt'])
    union_func(row['sqlstmt'],"temp_view")
    final_union_stmt = final_union_stmt + row['sqlstmt']

print(f"Final Union statement is :{final_union_stmt};")

最后我从输出视图中获取结果

%sql
SELECT * FROM temp_view;

解决此问题的另一种方法是使用循环形成实际的 sql 语句,然后一次性执行它。如果只需要sql union语句,可以去掉sql语句的执行。

© www.soinside.com 2019 - 2024. All rights reserved.