Snowflake 中的动态 SQL 和游标迭代

问题描述 投票:0回答:1

我整个周末都坐着,试图弄清楚雪花过程块中游标迭代的实现。

目标是创建一个动态过程,在调用时检查我的 Snowflake DB 架构中的所有可用表是否存在(不)现有的 account_ids。该程序将用作数据控制测试,以检查我们的数据删除程序是否已正确运行。

我还想对声明块中的所有查询使用参数,以便我们可以轻松地将此过程适合具有相同表但存储在不同模式中的不同数据库。

即:SOURCE_DATABASE。LOM_US_DB_PUBLIC 与 SOURCE_DATABASE。LOM_EU_DB_PUBLIC

当前实施:

该过程在给定数据库模式中的所有表中检查与指定 ACCOUNT_ID 关联的记录。该过程采用三个参数:DB_NAME、SCHEMA_NAME 和 ACCOUNT_ID。它切换到指定的数据库和模式,迭代模式中的所有表,并构造一个 SQL 查询来对每个表中具有给定 ACCOUNT_ID 的记录进行计数。如果找到记录,它会将结果累积在摘要字符串中,该字符串将在过程结束时返回。

当前问题:

在实现

'For Loops'
或尝试使用
'Execute Immediately'
语句时,我不断遇到问题。我已经阅读了大部分 Snowflake 文档,但似乎找不到解决此问题的方法。

我不断收到相同的错误消息:

Syntax error: unexpected 'IMMEDIATE'. (line 22)

syntax error line 22 at position 35 unexpected 'INTO'. syntax error line 25 at position 8 unexpected 'IF'. (line 22)

Syntax error: unexpected 'FOR'. (line 21)

期望的结果:

在迭代结束时,该过程返回一个摘要,其中包含具有指定 ACCOUNT_ID 记录的所有表的详细信息。

PS:欢迎提供退货摘要建议!

CREATE OR REPLACE PROCEDURE DATA_DELETION_TEST(
    DB_NAME STRING, -- Set Parameters to customize test for different DBs and Schemas
    SCHEMA_NAME STRING,
    ACCOUNT_ID STRING
)
RETURNS STRING -- Procedure returns a string as result 
LANGUAGE SQL -- Procedure is written in SQL language 
EXECUTE AS CALLER -- Procedure has the same access permissions as the user who is executing it
AS
$$ -- Start of the procedure body
DECLARE
    CURSOR_RESULT STRING DEFAULT ''; -- summarizes which tables contain data for the specified account ID.
    TABLE_NAME STRING; -- construct the SQL query dynamically for each table.
    SQL_TEXT STRING; -- SQL query string that checks for data connected to the given account ID in that specific table.
    QUERY_RESULT STRING; -- holds the count of records found in the table for the specified account ID.
BEGIN -- Start of the Procedure Body
    -- Switch to the specified database
    EXECUTE IMMEDIATE 'USE DATABASE ' || DB_NAME || ';';
    -- Switch to the specified schema
    EXECUTE IMMEDIATE 'USE SCHEMA ' || SCHEMA_NAME || ';';

    -- Iterate over the tables in the specified schema
    FOR RECORD IN (SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = SCHEMA_NAME) DO
        -- RECORD.TABLE_NAME refers to the TABLE_NAME column from the cursor's result set
        TABLE_NAME := RECORD.TABLE_NAME;

        -- Construct the SQL query to check for data
        SQL_TEXT := 'SELECT COUNT(*) FROM ' || SCHEMA_NAME || '.' || TABLE_NAME || ' WHERE ACCOUNT_ID = ''' || ACCOUNT_ID || '''';

        -- Execute the query and fetch the result
        EXECUTE IMMEDIATE SQL_TEXT INTO QUERY_RESULT;

        -- If data exists, store the result
        IF QUERY_RESULT > 0 THEN
            CURSOR_RESULT := CURSOR_RESULT || 'Table: ' || TABLE_NAME || ' has ' || QUERY_RESULT || ' records for Account ID: ' || ACCOUNT_ID || '. ';
        END IF;
    END FOR;

    RETURN CURSOR_RESULT;
END;
$$; -- End of the procedure body

CALL DATA_DELETION_TEST('SOURCE_DATABASE', 'LOM_US_DB_PUBLIC', 'your_account_id');
stored-procedures snowflake-cloud-data-platform database-cursor
1个回答
0
投票

我总是发现使用存储过程以及获取和分配变量令人困惑,所以对下面的内容持保留态度。

我认为这不是你使用“INTO”的方式,我还修改了“IF”子句和获取姓名列表的方式:

CREATE OR REPLACE PROCEDURE DATA_DELETION_TEST( DB_NAME STRING, -- Set Parameters to customize test for different DBs and Schemas SCHEMA_NAME STRING, ACCOUNT_ID STRING ) RETURNS STRING -- Procedure returns a string as result LANGUAGE SQL -- Procedure is written in SQL language EXECUTE AS CALLER -- Procedure has the same access permissions as the user who is executing it AS DECLARE CURSOR_RESULT STRING DEFAULT ''; -- summarizes which tables contain data for the specified account ID. TABLE_NAME STRING; -- construct the SQL query dynamically for each table. SQL_TEXT STRING; -- SQL query string that checks for data connected to the given account ID in that specific table. QUERY_RESULT STRING; -- holds the count of records found in the table for the specified account ID. -- added vars: GET_TABLES_STATEMENT VARCHAR DEFAULT ( ' select table_name from ' || :DB_NAME || '.INFORMATION_SCHEMA.TABLES where table_schema = \'' || :SCHEMA_NAME || '\';' ); TABLES_AVAILABLE RESULTSET DEFAULT (EXECUTE IMMEDIATE GET_TABLES_STATEMENT); CUR_TABLE CURSOR for TABLES_AVAILABLE; holder RESULTSET; BEGIN -- Start of the Procedure Body -- Switch to the specified database EXECUTE IMMEDIATE 'USE DATABASE ' || DB_NAME || ';'; -- Switch to the specified schema EXECUTE IMMEDIATE 'USE SCHEMA ' || SCHEMA_NAME || ';'; -- Iterate over the tables in the specified schema FOR RECORD IN CUR_TABLE DO -- RECORD.TABLE_NAME refers to the TABLE_NAME column from the cursor's result set TABLE_NAME := RECORD.TABLE_NAME; -- Construct the SQL query to check for data SQL_TEXT := 'SELECT COUNT(*) AS CNT FROM ' || SCHEMA_NAME || '.' || TABLE_NAME || ' WHERE ACCOUNT_ID = ''' || ACCOUNT_ID || ''''; -- Execute the query and fetch the result holder := (EXECUTE IMMEDIATE SQL_TEXT); let c1 cursor for holder; open c1; fetch c1 into QUERY_RESULT; -- If data exists, store the result IF (QUERY_RESULT > 0) THEN CURSOR_RESULT := CURSOR_RESULT || 'Table: ' || TABLE_NAME || ' has ' || QUERY_RESULT || ' records for Account ID: ' || ACCOUNT_ID || '. '; END IF; END FOR; RETURN CURSOR_RESULT; END;
现在测试一下:
一些准备数据:

use schema test_db.sample_procedure_dvd; create or replace table test_db.sample_procedure_dvd.sample_data as ( select 'US' as account_id, 1 as id union all select 'UK', 2 ); create or replace table test_db.sample_procedure_dvd.sample_data_us as (select * from test_db.sample_procedure_dvd.sample_data where account_id = 'US'); create or replace table test_db.sample_procedure_dvd.sample_data_non_us as (select * from test_db.sample_procedure_dvd.sample_data where account_id != 'US');
我现在应该有 3 个表,其中 2 个有 account_id = US 的数据,一张仅适用于英国。

调用程序:

CALL DATA_DELETION_TEST('TEST_DB', 'SAMPLE_PROCEDURE_DVD', 'US');
返回预期输出:

Table: SAMPLE_DATA has 1 records for Account ID: US. Table: SAMPLE_DATA_US has 1 records for Account ID: US.


    

© www.soinside.com 2019 - 2024. All rights reserved.