在 blob 存储中,我有多个没有列名称的 csv 文件,使用 get 元活动我获取所有这些文件,并且我想使用循环中的复制数据活动将这些文件上传到 Azure SQL 中的相应表。 (每个表都有不同的架构)。下面是 2 个示例文件。 我尝试过使用单个文件。我想循环处理多个文件。
**sample1.txt**
5|1|300|100| |101|809|4|4|4|0|0|0|0|0|0|0|0|2|0|0|0|0|-4|0|0||2020-07-06
5|1|300|100| |102|809|6|5|5|0|0|0|0|0|0|0|0|2|0|0|0|0|-5|0|0||2020-07-14
5|1|300|100| |103|809|-1|-1|-1|0|0|0|0|0|0|0|0|2|0|0|0|0|1|0|0||2020-07-05
5|1|300|100| |104|809|7|7|7|0|0|0|0|0|0|0|0|2|0|0|0|0|-7|0|0||2020-07-05
5|1|300|100| |105|809|-5|-5|-5|0|0|0|0|0|0|0|0|2|0|0|0|0|5|0|0||2021-08-18
5|1|300|100| |106|809|0|0|0|0|0|0|0|0|0|0|0|2|0|0|0|0|0|0|0||2020-07-05
5|1|300|100| |107|809|8|8|8|0|0|0|0|0|0|0|0|2|0|0|0|0|-8|0|0||2020-07-14
5|1|300|100| |108|809|0|0|0|0|0|0|0|0|0|0|0|2|0|0|0|0|0|0|0||2020-07-08
5|1|300|100| |109|809|2|2|2|0|0|0|0|0|0|0|0|2|0|0|0|0|-2|0|0||2020-07-14
5|1|300|100| |111|809|2|2|2|0|0|0|0|0|0|0|0|2|0|0|0|0|-2|0|0||2020-07-07
5|1|300|100| |112|809|4|4|4|0|0|0|0|0|0|0|0|2|0|0|0|0|-4|0|0||2020-07-05
5|1|300|100| |114|809|3|3|3|0|0|0|0|0|0|0|0|2|0|0|0|0|-3|0|0||2020-07-08
**sample2.txt**
5|1|300|100| |131|809|0|0|0|0|0|0|0|0|0|0|0|2|0|0|0|0|0|0|0||2020-07-06
5|1|300|100| |132|809|7|7|7|0|0|0|0|0|0|0|0|2|0|0|0|0|-7|0|0||2020-07-08
5|1|300|100| |135|809|0|0|0|0|0|0|0|0|0|0|0|2|0|0|0|0|0|0|0||2020-07-05
5|1|300|100| |136|809|2|2|2|0|0|0|0|0|0|0|0|2|0|0|0|0|-2|0|0||2020-07-08
5|1|300|100| |138|809|5|5|5|0|0|0|0|0|0|0|0|2|0|0|0|0|-5|0|0||2020-07-08
5|1|300|100| |139|809|3|3|3|0|0|0|0|0|0|0|0|2|0|0|0|0|-3|0|0||2020-07-07
5|1|300|100| |140|809|0|0|0|0|0|0|0|0|0|0|0|2|0|0|0|0|0|0|0||2020-07-06
5|1|300|100| |142|809|0|0|0|0|0|0|0|0|0|0|0|2|0|0|0|0|0|0|0||2020-07-08
5|1|300|100| |143|809|3|3|3|0|0|0|0|0|0|0|0|2|0|0|0|0|-3|0|0||2020-07-14
5|1|300|100| |146|809|0|0|0|0|0|0|0|0|0|0|0|2|0|0|0|0|0|0|0||2020-07-05```
对于单个无头源文件,您可以手动进行映射。但对于多个文件,映射需要动态完成,而不考虑标题。因此,使用复制活动可能会很难每次动态构建模式。相反,使用数据流,它将架构动态映射到目标表。
如果所有目标 SQL 表都已创建,那么您需要同时循环表名数组和获取元数据文件列表数组。
首先获取表名数组的正确顺序,该顺序应与文件名数组的顺序相同。现在,您需要在同一个 for 循环中迭代两个数组。通过这个SO答案了解如何同时循环2个相同长度的数组。
或者如果您的目标表名称与文件名相同(
filename.txt
),则无需迭代另一个数组。您可以从文件名本身中提取表名。我正在遵循这种方法。
首先将“获取元数据”活动子项数组赋予“对于每个活动”项。
为源csv文件和目标SQL表创建数据集。要迭代文件和表,请使用数据集参数。创建字符串类型参数并在文件名和表名动态内容中使用它们,如下所示。
源 csv 数据集,文件名带有数据集参数:
目标SQL表数据集,数据集参数为表名:
在这里,将过程分为两部分。一种是处理数据复制到已有的表,另一种是通过创建表将数据复制到新表。
为此,首先在 ForEach 内的查找活动中使用以下查询来检查当前表是否已存在。
IF OBJECT_ID (N'@{split(item().name,'.')[0]}', N'U') IS NOT NULL
SELECT 1 AS res ELSE SELECT 0 AS res;
在 if 活动条件中使用以下表达式。
@equals(activity('Lookup1').output.firstRow.res, 1)
现在,使用 2 个数据流。一个用于已存在的表,另一个用于新表。
对于现有数据流的数据流,使用联合转换添加 csv 数据的标题,如下所示。首先获取两个源,目标表数据集和源 csv 数据集。 要仅选择目标表中的列,请在源查询动态表达式中给出以下表达式。创建字符串类型
table_name
的数据流参数并在表达式中使用它。
"select * from {$table_name} where 'Rakesh'='Laddu'"
按位置进行并集,并将与接收器相同的目标表数据集添加到数据流中。
在管道中,在 if Activity True 活动中获取数据流活动并传递数据集参数,如下所示。
此外,在同一数据流活动中为数据流参数
@split(item().name,'.')[0]
传递相同的表达式 table_name
。
对于新表,采用另一个数据流,将 csv 数据集作为源,将 SQL 目标表数据集作为接收器。在接收器设置中检查重新创建表选项。
在 if 的 False 活动中添加此数据流,并传递与上面相同的数据集参数值。
现在,执行管道。对于现有表,将插入数据,对于不存在的表,将使用 ADF 中的默认列名称创建相应的表,如下所示。
现有表的数据流 JSON:
{
"name": "dataflow1",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "AzureSqlTable1",
"type": "DatasetReference"
},
"name": "source1"
},
{
"dataset": {
"referenceName": "source_csvs",
"type": "DatasetReference"
},
"name": "source2"
}
],
"sinks": [
{
"dataset": {
"referenceName": "AzureSqlTable1",
"type": "DatasetReference"
},
"name": "sink1"
}
],
"transformations": [
{
"name": "union1"
}
],
"scriptLines": [
"parameters{",
" table_name as string",
"}",
"source(allowSchemaDrift: true,",
" validateSchema: false,",
" isolationLevel: 'READ_UNCOMMITTED',",
" query: (\"select * from {$table_name} where 'Rakesh'='Laddu'\"),",
" format: 'query') ~> source1",
"source(allowSchemaDrift: true,",
" validateSchema: false,",
" ignoreNoFilesFound: false) ~> source2",
"source1, source2 union(byName: false)~> union1",
"union1 sink(allowSchemaDrift: true,",
" validateSchema: false,",
" deletable:false,",
" insertable:true,",
" updateable:false,",
" upsertable:false,",
" format: 'table',",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true,",
" errorHandlingOption: 'stopOnFirstError') ~> sink1"
]
}
}
}
新表的数据流 JSON:
{
"name": "dataflow2",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "source_csvs",
"type": "DatasetReference"
},
"name": "source1"
}
],
"sinks": [
{
"dataset": {
"referenceName": "AzureSqlTable1",
"type": "DatasetReference"
},
"name": "sink1"
}
],
"transformations": [],
"scriptLines": [
"source(allowSchemaDrift: true,",
" validateSchema: false,",
" ignoreNoFilesFound: false) ~> source1",
"source1 sink(allowSchemaDrift: true,",
" validateSchema: false,",
" deletable:false,",
" insertable:true,",
" updateable:false,",
" upsertable:false,",
" recreate:true,",
" format: 'table',",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true,",
" errorHandlingOption: 'stopOnFirstError') ~> sink1"
]
}
}
}
此管道 JSON:
{
"name": "pipeline1",
"properties": {
"activities": [
{
"name": "Get Metadata1",
"type": "GetMetadata",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"dataset": {
"referenceName": "Get_csv_list",
"type": "DatasetReference"
},
"fieldList": [
"childItems"
],
"storeSettings": {
"type": "AzureBlobFSReadSettings",
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
}
},
{
"name": "ForEach1",
"type": "ForEach",
"dependsOn": [
{
"activity": "Get Metadata1",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@activity('Get Metadata1').output.childItems",
"type": "Expression"
},
"isSequential": true,
"activities": [
{
"name": "Lookup1",
"type": "Lookup",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": {
"value": "IF OBJECT_ID (N'@{split(item().name,'.')[0]}', N'U') IS NOT NULL \n SELECT 1 AS res ELSE SELECT 0 AS res;",
"type": "Expression"
},
"queryTimeout": "02:00:00",
"partitionOption": "None"
},
"dataset": {
"referenceName": "AzureSqlTable1",
"type": "DatasetReference",
"parameters": {
"table_name": "''"
}
},
"firstRowOnly": true
}
},
{
"name": "If Condition1",
"type": "IfCondition",
"dependsOn": [
{
"activity": "Lookup1",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"expression": {
"value": "@equals(activity('Lookup1').output.firstRow.res, 1)",
"type": "Expression"
},
"ifFalseActivities": [
{
"name": "Data flow2",
"type": "ExecuteDataFlow",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"dataflow": {
"referenceName": "dataflow2",
"type": "DataFlowReference",
"datasetParameters": {
"source1": {
"filename": {
"value": "@item().name",
"type": "Expression"
}
},
"sink1": {
"table_name": {
"value": "@split(item().name,'.')[0]",
"type": "Expression"
}
}
}
},
"compute": {
"coreCount": 8,
"computeType": "General"
},
"traceLevel": "Fine"
}
}
],
"ifTrueActivities": [
{
"name": "Data flow1",
"type": "ExecuteDataFlow",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"dataflow": {
"referenceName": "dataflow1",
"type": "DataFlowReference",
"parameters": {
"table_name": {
"value": "'@{split(item().name,'.')[0]}'",
"type": "Expression"
}
},
"datasetParameters": {
"source1": {
"table_name": {
"value": "@split(item().name,'.')[0]",
"type": "Expression"
}
},
"source2": {
"filename": {
"value": "@item().name",
"type": "Expression"
}
},
"sink1": {
"table_name": {
"value": "@split(item().name,'.')[0]",
"type": "Expression"
}
}
}
},
"compute": {
"coreCount": 8,
"computeType": "General"
},
"traceLevel": "Fine"
}
}
]
}
}
]
}
}
],
"annotations": []
}
}