我创建了一个 DLT 元数据驱动的笔记本,它从配置表读取数据并将数据从 ADLS Gen2 处理到 DLT 表。我能够参数化所有其他选项,如 target、source、sequence_by 等。但是我在参数化 keys 选项时遇到问题。在执行逻辑时,如果参数值作为“pkey”传递以进行引用,spark 会拆分所有字符并尝试将每个字符作为列名传递,通过该列名我收到“[UNRESOLVED_COLUMN.WITH_SUGGESTION] 具有名称的列、变量或函数参数
[
无法解决。您指的是以下其中一项吗? [Name
,pkey
,Value
,dq_check
,Description
]。 SQL状态:42703;
'项目 ['[, '', 'p, 'k, 'e, 'y, '', ']]'。
---配置表的 SQL 脚本
DROP TABLE IF EXISTS XyzBank.MetaData.SourceMetaData;
CREATE TABLE IF NOT EXISTS XyzBank.MetaData.SourceMetaData
(
SourceMetaDataId BIGINT GENERATED ALWAYS AS IDENTITY
,SourceFileName STRING
,SourceFilePath STRING
,SourceFileFormat STRING
,SourceActive STRING
,SourceDelimeter STRING
,SourceHeader STRING
,SourcePartitionColumn STRING
,SourceDataQuality STRING
,SourceTransformation STRING
,SourceTransformFunc STRING
,SourceFileOptions STRING
,SourceTableName STRING
,SourceSchemaName STRING
,ScdExceptColumnList STRING
,ScdType INT
,SequenceBy STRING
,Keys STRING
,SourceGroupId INT
,CreatedOn TIMESTAMP
,CreatedBy STRING
,ModifiedOn TIMESTAMP
,ModifiedBy STRING
)
INSERT INTO XyzBank.MetaData.SourceMetaData (SourceFilePath,SourceFileFormat,SourceActive,SourceDelimeter,SourceHeader,SourceDataQuality,SourceFileOptions,SourceTableName,ScdType,SequenceBy,Keys,SourceGroupId,CreatedOn,CreatedBy,ModifiedOn,ModifiedBy)
SELECT 'abfss://[email protected]/DLT/', 'csv', 'True', ',', 'True', '{"validate Description":"(Description is NOT NULL)","validate Name":"(Name is NOT NULL)"}', '{"cloudFiles.format":"csv","header":True}', 'Product', 2,'file_process_date','["pkey"]',1, current_timestamp(), 'ABC', current_timestamp(), 'ABC'
DROP TABLE IF EXISTS XyzBank.MetaData.SourceSchemaConfig;
CREATE TABLE IF NOT EXISTS XyzBank.MetaData.SourceSchemaConfig
(
SourceSchemaConfigId BIGINT GENERATED ALWAYS AS IDENTITY
,SourceMetaDataId INT
,ColumnName STRING
,ColumnDataType STRING
,ColumnOrder INT
,IsNullable STRING
,IsSensitive STRING
,CreatedOn TIMESTAMP
,CreatedBy STRING
,ModifiedOn TIMESTAMP
,ModifiedBy STRING
)
INSERT INTO XyzBank.MetaData.SourceSchemaConfig (SourceMetaDataId,ColumnName,ColumnDataType,ColumnOrder,IsNullable,IsSensitive,CreatedOn,CreatedBy,ModifiedOn,ModifiedBy)
SELECT 1, 'pkey', 'IntegerType', 1, 'False', 'No', current_timestamp(), 'ABC', current_timestamp(), 'ABC' UNION
SELECT 1, 'Description', 'StringType', 2, 'True', 'No', current_timestamp(), 'ABC', current_timestamp(), 'ABC' UNION
SELECT 1, 'Name', 'StringType', 3, 'True', 'No', current_timestamp(), 'ABC', current_timestamp(), 'ABC' UNION
SELECT 1, 'Value', 'StringType', 4, 'True', 'No', current_timestamp(), 'ABC', current_timestamp(), 'ABC'
------SQL 脚本结束
#pyspark 脚本
import dlt
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, expr
from pyspark.sql import Row
import json
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, TimestampType, DateType, DoubleType
df = spark.sql("SELECT * FROM xyzbank.metadata.sourcemetadata WHERE SourceGroupId = 1")
for row in df.collect():
#print(row['SourceMetaDataId'])
schema_query = f"SELECT * FROM xyzbank.metadata.sourceschemaconfig WHERE SourceMetaDataId = {row['SourceMetaDataId']}"
#print(schema_query)
df_schema = spark.sql(schema_query)
#display(df_schema)
data_type_mapping = {
"StringType": StringType(),
"IntegerType": IntegerType(),
"TimeType": TimestampType(),
"Datetime": DateType(),
"DoubleType": DoubleType(),
"DateType": DateType()
}
# Collect distint values of "ColumnDataType" and "ColumnName" and "ColunOrder"
distinct_datatypes = (
df_schema.select("ColumnDataType", "ColumnName", "ColumnOrder").distinct().collect()
)
# Sort distinct_datatypes based on "ColumnOrder"
distinct_datatypes = sorted(distinct_datatypes, key=lambda x: x.ColumnOrder)
# Create schema fields
schema_fields = [
StructField(row.ColumnName, data_type_mapping[row.ColumnDataType], True)
for row in distinct_datatypes
]
# Create and return the schema
schema = StructType(schema_fields)
display(row)
#dlt_ingestion_metdata_function(row=row, schema=schema)
table_name = row['SourceTableName']
checks = row['SourceDataQuality']
checks = json.loads(checks)
keys = row['Keys']
display(keys)
#keys = ["pkey"]
print(keys)
sequence_by = row['SequenceBy']
display(sequence_by)
file_path = row['SourceFilePath']
cloud_file_options = eval(row['SourceFileOptions'])
dq_rules = "({0})".format("AND".join(checks.values()))
@dlt.table(
name = "brz_load_"+table_name
)
def bronze_load():
df3 = spark.readStream.format("cloudFiles").options(**cloud_file_options).schema(schema).load(file_path)
df3 = df3.withColumn("file_process_date", F.current_timestamp())
return df3
@dlt.table(
name = "stag_silver_load_"+table_name
)
@dlt.expect_all(checks)
def stag_silver_table():
df3 = dlt.readStream("brz_load_"+table_name)
df3 = df3.withColumn("dq_check", F.expr(dq_rules)).filter("dq_check=true")
return df3
dlt.create_streaming_table(
name = "silver_load_"+table_name
)
dlt.apply_changes(
target = "silver_load_"+table_name,
source = "stag_silver_load_"+table_name,
keys=keys,
stored_as_scd_type=2,
sequence_by=sequence_by
)
@dlt.table(
name = "quarantine_silver_load_"+table_name
)
@dlt.expect_all(checks)
def quarantine_silver_table():
df3 = dlt.readStream("brz_load_"+table_name)
df3 = df3.withColumn("dq_check", F.expr(dq_rules)).filter("dq_check=false")
return df3
#end of pyspark script
您插入的按键
XyzBank.MetaData.SourceMetaData
是字符串类型'["pkey"]'
,因此将其转换为字符串数组并传递给apply_changes
。
使用下面的代码进行转换。
import json
keys = json.loads(row['Keys'])
如果输入始终是此字符串的类型
'["pkey"]'
您可以使用上面的代码来获取字符串数组,否则如果输入字符串根据您的转换而变化,以便apply_changes
将键获取为包含以下内容的字符串数组关键栏。