Azure Databricks:无法参数化 dlt.apply_changes 的键选项

问题描述 投票:0回答:1

我创建了一个 DLT 元数据驱动的笔记本,它从配置表读取数据并将数据从 ADLS Gen2 处理到 DLT 表。我能够参数化所有其他选项,如 target、source、sequence_by 等。但是我在参数化 keys 选项时遇到问题。在执行逻辑时,如果参数值作为“pkey”传递以进行引用,spark 会拆分所有字符并尝试将每个字符作为列名传递,通过该列名我收到“[UNRESOLVED_COLUMN.WITH_SUGGESTION] 具有名称的列、变量或函数参数

[
无法解决。您指的是以下其中一项吗? [
Name
pkey
Value
dq_check
Description
]。 SQL状态:42703; '项目 ['[, '', 'p, 'k, 'e, 'y, '', ']]'。

---配置表的 SQL 脚本

    DROP TABLE IF EXISTS XyzBank.MetaData.SourceMetaData;
    CREATE TABLE IF NOT EXISTS XyzBank.MetaData.SourceMetaData
    (
    SourceMetaDataId BIGINT GENERATED ALWAYS AS IDENTITY
    ,SourceFileName STRING
    ,SourceFilePath STRING
    ,SourceFileFormat STRING    
    ,SourceActive STRING    
    ,SourceDelimeter STRING 
    ,SourceHeader STRING    
    ,SourcePartitionColumn STRING   
    ,SourceDataQuality STRING   
    ,SourceTransformation STRING    
    ,SourceTransformFunc STRING 
    ,SourceFileOptions STRING   
    ,SourceTableName STRING 
    ,SourceSchemaName STRING    
    ,ScdExceptColumnList STRING 
    ,ScdType    INT
    ,SequenceBy STRING
    ,Keys  STRING   
    ,SourceGroupId  INT
    ,CreatedOn TIMESTAMP
    ,CreatedBy STRING   
    ,ModifiedOn TIMESTAMP
    ,ModifiedBy STRING
    )
    
    
    INSERT INTO XyzBank.MetaData.SourceMetaData (SourceFilePath,SourceFileFormat,SourceActive,SourceDelimeter,SourceHeader,SourceDataQuality,SourceFileOptions,SourceTableName,ScdType,SequenceBy,Keys,SourceGroupId,CreatedOn,CreatedBy,ModifiedOn,ModifiedBy)
    SELECT 'abfss://[email protected]/DLT/', 'csv', 'True', ',', 'True', '{"validate Description":"(Description is NOT NULL)","validate Name":"(Name is NOT NULL)"}', '{"cloudFiles.format":"csv","header":True}', 'Product', 2,'file_process_date','["pkey"]',1, current_timestamp(), 'ABC', current_timestamp(), 'ABC'
    
    
    
    
    DROP TABLE IF EXISTS XyzBank.MetaData.SourceSchemaConfig;
    CREATE TABLE IF NOT EXISTS XyzBank.MetaData.SourceSchemaConfig
    (
    SourceSchemaConfigId BIGINT GENERATED ALWAYS AS IDENTITY    
    ,SourceMetaDataId   INT
    ,ColumnName STRING  
    ,ColumnDataType STRING  
    ,ColumnOrder INT    
    ,IsNullable STRING  
    ,IsSensitive STRING 
    ,CreatedOn TIMESTAMP    
    ,CreatedBy STRING   
    ,ModifiedOn TIMESTAMP   
    ,ModifiedBy STRING
    )
    
    
    
    INSERT INTO XyzBank.MetaData.SourceSchemaConfig (SourceMetaDataId,ColumnName,ColumnDataType,ColumnOrder,IsNullable,IsSensitive,CreatedOn,CreatedBy,ModifiedOn,ModifiedBy)
    SELECT 1, 'pkey', 'IntegerType', 1, 'False', 'No', current_timestamp(), 'ABC', current_timestamp(), 'ABC' UNION
    SELECT 1, 'Description', 'StringType', 2, 'True', 'No', current_timestamp(), 'ABC', current_timestamp(), 'ABC' UNION
    SELECT 1, 'Name', 'StringType', 3, 'True', 'No', current_timestamp(), 'ABC', current_timestamp(), 'ABC' UNION
    SELECT 1, 'Value', 'StringType', 4, 'True', 'No', current_timestamp(), 'ABC', current_timestamp(), 'ABC'

------SQL 脚本结束

#pyspark 脚本

    import dlt 
    from pyspark.sql import SparkSession 
    from pyspark.sql.functions import col, lit, expr 
    from pyspark.sql import Row 
    import json 
    from pyspark.sql import functions as F 
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, TimestampType, DateType, DoubleType

    df = spark.sql("SELECT * FROM xyzbank.metadata.sourcemetadata WHERE SourceGroupId = 1")
    for row in df.collect():
        #print(row['SourceMetaDataId'])
        schema_query = f"SELECT * FROM xyzbank.metadata.sourceschemaconfig WHERE SourceMetaDataId = {row['SourceMetaDataId']}"
        #print(schema_query)
        df_schema = spark.sql(schema_query)
        #display(df_schema)
        
        
        data_type_mapping = {
            "StringType": StringType(),
            "IntegerType": IntegerType(),
            "TimeType": TimestampType(),
            "Datetime": DateType(),
            "DoubleType": DoubleType(),
            "DateType": DateType()
        }
    
        # Collect distint values of "ColumnDataType" and "ColumnName" and "ColunOrder"
        distinct_datatypes = (
            df_schema.select("ColumnDataType", "ColumnName", "ColumnOrder").distinct().collect()
        )
    
        # Sort distinct_datatypes based on "ColumnOrder"
        distinct_datatypes = sorted(distinct_datatypes, key=lambda x: x.ColumnOrder)
    
        # Create schema fields
        schema_fields = [
            StructField(row.ColumnName, data_type_mapping[row.ColumnDataType], True)
            for row in distinct_datatypes
        ]
    
        # Create and return the schema
        schema = StructType(schema_fields)
        
        
        
        display(row)
        #dlt_ingestion_metdata_function(row=row, schema=schema)
        table_name = row['SourceTableName']
        checks = row['SourceDataQuality']
        checks = json.loads(checks)
        keys = row['Keys']
        display(keys)
        #keys = ["pkey"]
        print(keys)
        sequence_by = row['SequenceBy']
        display(sequence_by)
        file_path = row['SourceFilePath']
        cloud_file_options = eval(row['SourceFileOptions'])
        dq_rules = "({0})".format("AND".join(checks.values()))
    
        @dlt.table(
            name = "brz_load_"+table_name
        )
        def bronze_load():
            df3 = spark.readStream.format("cloudFiles").options(**cloud_file_options).schema(schema).load(file_path)
            df3 = df3.withColumn("file_process_date", F.current_timestamp())
            return df3
        
        @dlt.table(
            name = "stag_silver_load_"+table_name
        )
        @dlt.expect_all(checks)
        def stag_silver_table():
            df3 = dlt.readStream("brz_load_"+table_name)
            df3 = df3.withColumn("dq_check", F.expr(dq_rules)).filter("dq_check=true")
            return df3
        
    
        dlt.create_streaming_table(
            name = "silver_load_"+table_name
        )
        dlt.apply_changes(
            target = "silver_load_"+table_name,
            source = "stag_silver_load_"+table_name,
            keys=keys,
            stored_as_scd_type=2,
            sequence_by=sequence_by
        )
    
        @dlt.table(
            name = "quarantine_silver_load_"+table_name
        )
        @dlt.expect_all(checks)
        def quarantine_silver_table():
            df3 = dlt.readStream("brz_load_"+table_name)
            df3 = df3.withColumn("dq_check", F.expr(dq_rules)).filter("dq_check=false")
            return df3
        
    

 #end of pyspark script
azure databricks
1个回答
0
投票

您插入的按键

XyzBank.MetaData.SourceMetaData
是字符串类型
'["pkey"]'
,因此将其转换为字符串数组并传递给
apply_changes

使用下面的代码进行转换。

import  json
keys  =  json.loads(row['Keys'])

如果输入始终是此字符串的类型

'["pkey"]'
您可以使用上面的代码来获取字符串数组,否则如果输入字符串根据您的转换而变化,以便
apply_changes
将键获取为包含以下内容的字符串数组关键栏。

© www.soinside.com 2019 - 2024. All rights reserved.