我在 ADF 中有以下数据流,它解析 JSON 文件并使用ExternalCall(源)内部每个对象的 ID。从ExternalCall返回的数据内的JSON属性被转换(通过cachedMappings接收器)到适当的SQL(接收器)列。
此数据流的主要目的是利用漂移模式功能,因为我将传递不同的映射和参数来针对不同端点(来自同一服务)发出请求,并将响应映射到在 cachedMappings 中分配的映射。因此,我想避免任何建议将任何模式硬编码或直接映射到任何一项特定活动的解决方案。
我目前仅使用 JSON 属性的一小部分进行测试,当我检查 alterArray 和 sqlSink DataPreview 时,我可以看到预期的数据值和列,并且 DataPreview 列全部与相同的字段名称完全匹配(包括大写)在 sqlSink 表上。
问题: 当我调试(甚至触发)数据流时,发布的唯一列是“shop_id”。我尝试了许多不同的方法,包括更改 ADF 和 Sql 中的列类型,但没有成功。
AlterArray DataPreview - 正确显示将更新插入 2 行
sqlSink DataPreview - 正确显示 2 行 5 列全部为漂移
sqlSink 设置 - 启用架构漂移。表键值使用表达式 split($sinkKeys,',') 从值 'Transaction_Id,Order_Id' 中设置,该值拆分为 sql 表中的现有列
我完全不知所措。到目前为止,我发现的唯一可能是罪魁祸首的是输出文件(我也可以通过检查流程看到)是所有设置为 NULL 的列都引用 externalCall 中的“body”作为源列,而 shop_id 是(稍后在 addShopId 派生列中分配的)在此谱系中明确标识,并且它恰好是唯一设置有值的列。
外部呼叫
为了排除漂移列的任何问题,我一度添加了一项“派生列”活动来映射漂移列,但仍然没有成功。即使这会起作用,但它违背了此流程的目的,即传递“动态”映射和端点集以便能够解析不同的数据集。
下面也是输出 JSON 文件的链接。
如有任何帮助或建议,我们将不胜感激。请记住上面关于此数据流在利用漂移模式中的目的的注释。
您可能会遇到展平转换的问题,在外部调用活动之后,您可以使用以下方法来实现您的要求:
点击外部调用活动的输出选项卡中的导入投影,将会导入如下所示类型的页面数据:
它将数据作为复杂对象导入。
将派生列活动添加到外部调用活动,创建复杂数据列,如下所示:
它将根据需要创建列,使用select活动,选择所需的列,根据您的要求使用alter活动,添加带有插入和更新插入选项的SQL接收器,调试管道,数据将成功插入到SQL数据库中,如下所示如下图:
这里是数据流Json供您参考:
{
"name": "dataflow3",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "DelimitedText2",
"type": "DatasetReference"
},
"name": "source1"
}
],
"sinks": [
{
"dataset": {
"referenceName": "AzureSqlTable1",
"type": "DatasetReference"
},
"name": "sink1"
}
],
"transformations": [
{
"name": "externalCall1",
"linkedService": {
"referenceName": "RestService1",
"type": "LinkedServiceReference"
}
},
{
"name": "derivedColumn1"
},
{
"name": "AlterRow1"
},
{
"name": "derivedColumn2"
},
{
"name": "select1"
}
],
"scriptLines": [
"source(output(",
" ID as string,",
" DeltaValue as string",
" ),",
" allowSchemaDrift: true,",
" validateSchema: false,",
" ignoreNoFilesFound: false) ~> source1",
"derivedColumn1 call(output(",
" body as (body as string, id as string, title as string, userId as string)",
" ),",
" allowSchemaDrift: true,",
" format: 'rest',",
" store: 'restservice',",
" timeout: 30,",
" requestInterval: 0,",
" httpMethod: 'GET',",
" rowRelativeUrl: 'myid',",
" skipRowRelativeUrl: true,",
" bodyColumnName: 'body',",
" requestFormat: ['type' -> 'json'],",
" responseFormat: ['type' -> 'json', 'documentForm' -> 'singleDocument']) ~> externalCall1",
"source1 derive(myid = toString(ID)) ~> derivedColumn1",
"select1 alterRow(upsertIf((isNull(myid)==false()))) ~> AlterRow1",
"externalCall1 derive(id = body.id,",
" userId = body.userId,",
" title = body.title,",
" body = body.body) ~> derivedColumn2",
"derivedColumn2 select(mapColumn(",
" myid,",
" body,",
" id = derivedColumn2@id,",
" userId,",
" title",
" ),",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true) ~> select1",
"AlterRow1 sink(allowSchemaDrift: true,",
" validateSchema: false,",
" deletable:false,",
" insertable:true,",
" updateable:false,",
" upsertable:true,",
" keys:['myid'],",
" format: 'table',",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true,",
" errorHandlingOption: 'stopOnFirstError') ~> sink1"
]
}
}
}