我在 Azure Cosmos Mongo Db 中有一个集合,我正在使用复制活动来读取数据并将其加载到 blob 中。我需要根据行数对数据进行分区,在复制活动中,我可以看到一些具有 Limit 和 Skip 的游标方法,我如何使用它来表示例如获取前 100 条记录将其加载到文件中,然后在下一次迭代中,跳过前 100 个,然后获取下一个 100 并将其加载到另一个文件中。
您可以根据您的要求使用Cursor方法limit和skip。
在管道中创建变量,如下所示:
使用设置变量活动,使用 0
表达式将 count 和
rowswriting变量初始化为
@int('0')
。
接下来,使用 Until Activity 和以下动态表达式。
在 Until 活动中,采用以 cosmos mongo db 作为源、以 blob 作为接收器的复制活动。
在复制活动源中,为
count
提供 skip
变量,并将 limit
设置为 100
。
这里,我给出的限制为 2:
使用数据集参数作为文件名并给出以下表达式。
File@{variables('count')}.json
现在,使用设置变量来递增
count
变量并将其存储在 temp
变量中。在这里,我做了 2,但您需要将其增加 100
。
将其存储回
count
变量。
使用当前复制活动读取的行数更新
rowswritten
变量。
@activity('Copy data2').output.rowsRead
如果这是
0
,那么我们的直到活动停止,这意味着源数据将被复制,如下所示。
这是管道json供参考:
{
"name": "pipeline1",
"properties": {
"activities": [
{
"name": "Set variable1",
"type": "SetVariable",
"dependsOn": [],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "count",
"value": {
"value": "@int('0')",
"type": "Expression"
}
}
},
{
"name": "Set variable2",
"type": "SetVariable",
"dependsOn": [
{
"activity": "Set variable1",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "rowsWritten",
"value": {
"value": "@int('0')",
"type": "Expression"
}
}
},
{
"name": "Until1",
"type": "Until",
"dependsOn": [
{
"activity": "Set variable2",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"expression": {
"value": "@equals(variables('rowsWritten'), 0)",
"type": "Expression"
},
"activities": [
{
"name": "Copy data2",
"type": "Copy",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "CosmosDbMongoDbApiSource",
"batchSize": 100,
"cursorMethods": {
"skip": {
"value": "@variables('count')",
"type": "Expression"
},
"limit": 2
}
},
"sink": {
"type": "JsonSink",
"storeSettings": {
"type": "AzureBlobStorageWriteSettings"
},
"formatSettings": {
"type": "JsonWriteSettings"
}
},
"enableStaging": false
},
"inputs": [
{
"referenceName": "CosmosDbMongoDbCollection1",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "Json1",
"type": "DatasetReference",
"parameters": {
"FileName": {
"value": "File@{variables('count')}.json",
"type": "Expression"
}
}
}
]
},
{
"name": "Set variable3",
"type": "SetVariable",
"dependsOn": [
{
"activity": "Copy data2",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "temp",
"value": {
"value": "@add(variables('count'), 2)",
"type": "Expression"
}
}
},
{
"name": "Set variable4",
"type": "SetVariable",
"dependsOn": [
{
"activity": "Set variable3",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "count",
"value": {
"value": "@variables('temp')",
"type": "Expression"
}
}
},
{
"name": "Set variable5",
"type": "SetVariable",
"dependsOn": [
{
"activity": "Set variable4",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "rowsWritten",
"value": {
"value": "@activity('Copy data2').output.rowsRead",
"type": "Expression"
}
}
}
],
"timeout": "0.12:00:00"
}
}
],
"variables": {
"count": {
"type": "Integer"
},
"rowsWritten": {
"type": "Integer"
},
"temp": {
"type": "Integer"
}
},
"annotations": []
}
}