如何排除Azure数据工厂中的复制数据活动中的行?

问题描述 投票:2回答:1

我已经构建了一个具有一个复制数据活动的管道,该活动从Azure Data Lake复制数据并将其输出到Azure Blob Storage

在输出中,我可以看到我的一些行没有数据,我想从副本中排除它们。在以下示例中,第2行没有有用的数据:

{"TenantId":"qa","Timestamp":"2019-03-06T10:53:51.634Z","PrincipalId":2,"ControlId":"729c3b6e-0442-4884-936c-c36c9b466e9d","ZoneInternalId":0,"IsAuthorized":true,"PrincipalName":"John","StreetName":"Rue 1","ExemptionId":8}
{"TenantId":"qa","Timestamp":"2019-03-06T10:59:09.74Z","PrincipalId":null,"ControlId":null,"ZoneInternalId":null,"IsAuthorized":null,"PrincipalName":null,"StreetName":null,"ExemptionId":null}

在“复制数据”活动中,如何设置规则以排除错过某些值的行?

这是我的管道代码:

{
    "name": "pipeline1",
    "properties": {
        "activities": [
            {
                "name": "Copy from Data Lake to Blob",
                "type": "Copy",
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [
                    {
                        "name": "Source",
                        "value": "tenantdata/events/"
                    },
                    {
                        "name": "Destination",
                        "value": "controls/"
                    }
                ],
                "typeProperties": {
                    "source": {
                        "type": "AzureDataLakeStoreSource",
                        "recursive": true
                    },
                    "sink": {
                        "type": "BlobSink",
                        "copyBehavior": "MergeFiles"
                    },
                    "enableStaging": false,
                    "translator": {
                        "type": "TabularTranslator",
                        "columnMappings": {
                            "Body.TenantId": "TenantId",
                            "Timestamp": "Timestamp",
                            "Body.PrincipalId": "PrincipalId",
                            "Body.ControlId": "ControlId",
                            "Body.ZoneId": "ZoneInternalId",
                            "Body.IsAuthorized": "IsAuthorized",
                            "Body.PrincipalName": "PrincipalName",
                            "Body.StreetName": "StreetName",
                            "Body.Exemption.Kind": "ExemptionId"
                        }
                    }
                },
                "inputs": [
                    {
                        "referenceName": "qadl",
                        "type": "DatasetReference"
                    }
                ],
                "outputs": [
                    {
                        "referenceName": "datalakestaging",
                        "type": "DatasetReference"
                    }
                ]
            }
        ]
    }
}
azure azure-data-factory
1个回答
1
投票

这是一个非常好的问题(+1为此),几个月前我有同样的问题,我很惊讶我在复制活动中找不到任何东西来处理这个问题(我甚至尝试过容错功能但没有运气) 。

鉴于我在U-SQL的管道中进行了其他转换,我最终使用它来完成此任务。因此,我使用U-SQL Activity运算符在ADF中使用IS NOT NULL而不是复制活动,它取决于您的数据,但您可以使用它,也许您的字符串包含“NULL”或空字符串“”,这是它的外观:

DECLARE @file_set_path string = "adl://myadl.azuredatalake.net/Samples/Data/{date_utc:yyyy}{date_utc:MM}{date_utc:dd}T{date_utc:HH}{date_utc:mm}{date_utc:ss}Z.txt";

@data =
    EXTRACT 
            [id] string,
            date_utc DateTime
    FROM @file_set_path
    USING Extractors.Text(delimiter: '\u0001', skipFirstNRows : 1, quoting:false);

@result =
    SELECT 

            [id] ,
            date_utc.ToString("yyyy-MM-ddTHH:mm:ss") AS SourceExtractDateUTC
    FROM @data
    WHERE id IS NOT NULL -- you can also use WHERE id <> "" or <> "NULL";

OUTPUT @result TO "wasb://samples@mywasb/Samples/Data/searchlog.tsv" USING Outputters.Text(delimiter: '\u0001', outputHeader:true);

注意:支持ADLS和Blob存储INPUT/OUTPUT files

如果有帮助或上述示例对您的数据不起作用,请告诉我。希望有人会使用复制活动发布答案,这很棒但是到目前为止这是一种可能性。

© www.soinside.com 2019 - 2024. All rights reserved.