如何在 Azure Cosmos Mongo Db 中使用游标?

问题描述 投票:0回答:1

我在 Azure Cosmos Mongo Db 中有一个集合,我正在使用复制活动来读取数据并将其加载到 blob 中。我需要根据行数对数据进行分区,在复制活动中,我可以看到一些具有 Limit 和 Skip 的游标方法,我如何使用它来表示例如获取前 100 条记录将其加载到文件中,然后在下一次迭代中,跳过前 100 个,然后获取下一个 100 并将其加载到另一个文件中。

azure azure-data-factory azure-cosmosdb azure-cosmosdb-mongoapi
1个回答
0
投票

您可以根据您的要求使用Cursor方法limitskip

在管道中创建变量,如下所示:

enter image description here

使用设置变量活动,使用 0 表达式将 count

rowswriting
变量初始化为
@int('0')

接下来,使用 Until Activity 和以下动态表达式。

enter image description here

在 Until 活动中,采用以 cosmos mongo db 作为源、以 blob 作为接收器的复制活动。

在复制活动源中,为

count
提供
skip
变量,并将
limit
设置为
100

这里,我给出的限制为 2: enter image description here

使用数据集参数作为文件名并给出以下表达式。

File@{variables('count')}.json

enter image description here

现在,使用设置变量来递增

count
变量并将其存储在
temp
变量中。在这里,我做了 2,但您需要将其增加
100

enter image description here

将其存储回

count
变量。

enter image description here

使用当前复制活动读取的行数更新

rowswritten
变量。

@activity('Copy data2').output.rowsRead

enter image description here

如果这是

0
,那么我们的直到活动停止,这意味着源数据将被复制,如下所示。 enter image description here

这是管道json供参考:

{
    "name": "pipeline1",
    "properties": {
        "activities": [
            {
                "name": "Set variable1",
                "type": "SetVariable",
                "dependsOn": [],
                "policy": {
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "variableName": "count",
                    "value": {
                        "value": "@int('0')",
                        "type": "Expression"
                    }
                }
            },
            {
                "name": "Set variable2",
                "type": "SetVariable",
                "dependsOn": [
                    {
                        "activity": "Set variable1",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "policy": {
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "variableName": "rowsWritten",
                    "value": {
                        "value": "@int('0')",
                        "type": "Expression"
                    }
                }
            },
            {
                "name": "Until1",
                "type": "Until",
                "dependsOn": [
                    {
                        "activity": "Set variable2",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "expression": {
                        "value": "@equals(variables('rowsWritten'), 0)",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "Copy data2",
                            "type": "Copy",
                            "dependsOn": [],
                            "policy": {
                                "timeout": "0.12:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "source": {
                                    "type": "CosmosDbMongoDbApiSource",
                                    "batchSize": 100,
                                    "cursorMethods": {
                                        "skip": {
                                            "value": "@variables('count')",
                                            "type": "Expression"
                                        },
                                        "limit": 2
                                    }
                                },
                                "sink": {
                                    "type": "JsonSink",
                                    "storeSettings": {
                                        "type": "AzureBlobStorageWriteSettings"
                                    },
                                    "formatSettings": {
                                        "type": "JsonWriteSettings"
                                    }
                                },
                                "enableStaging": false
                            },
                            "inputs": [
                                {
                                    "referenceName": "CosmosDbMongoDbCollection1",
                                    "type": "DatasetReference"
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "Json1",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "FileName": {
                                            "value": "File@{variables('count')}.json",
                                            "type": "Expression"
                                        }
                                    }
                                }
                            ]
                        },
                        {
                            "name": "Set variable3",
                            "type": "SetVariable",
                            "dependsOn": [
                                {
                                    "activity": "Copy data2",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "policy": {
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "variableName": "temp",
                                "value": {
                                    "value": "@add(variables('count'), 2)",
                                    "type": "Expression"
                                }
                            }
                        },
                        {
                            "name": "Set variable4",
                            "type": "SetVariable",
                            "dependsOn": [
                                {
                                    "activity": "Set variable3",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "policy": {
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "variableName": "count",
                                "value": {
                                    "value": "@variables('temp')",
                                    "type": "Expression"
                                }
                            }
                        },
                        {
                            "name": "Set variable5",
                            "type": "SetVariable",
                            "dependsOn": [
                                {
                                    "activity": "Set variable4",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "policy": {
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "variableName": "rowsWritten",
                                "value": {
                                    "value": "@activity('Copy data2').output.rowsRead",
                                    "type": "Expression"
                                }
                            }
                        }
                    ],
                    "timeout": "0.12:00:00"
                }
            }
        ],
        "variables": {
            "count": {
                "type": "Integer"
            },
            "rowsWritten": {
                "type": "Integer"
            },
            "temp": {
                "type": "Integer"
            }
        },
        "annotations": []
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.