因此,我将一个名为 LastRunTimestamp 的变量作为字符串发送到数据流的参数。时间戳是这种格式
2024-11-21T00:00:00.0000000Z
我的查询类似于
"SELECT * FROM c WHERE c.Discriminator = 'D1' AND c.InsertDateTime > '$LastRunTimestamp'"
查询不会出错,但它肯定忽略了参数。如果我在数据浏览器中对其进行硬编码或测试,它就会起作用。但我就是无法让它作为参数工作。
我尝试了很多不同的方法来做到这一点,包括
concat('SELECT * FROM c WHERE c.InsertDateTime > ','D1',' AND c.InsertDateTime > ',$LastRunTimestamp)
但是每次我让查询工作时,它似乎总是忽略该参数。我还尝试过在不使用 concat 函数等的情况下连接字符串。我不知道还能尝试什么。
JSON数据流定义
{
"name": "dataflow1",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "ContainerTesting",
"type": "DatasetReference"
},
"name": "source1"
}
],
"sinks": [
{
"dataset": {
"referenceName": "TestTable",
"type": "DatasetReference"
},
"name": "sink2"
},
{
"dataset": {
"referenceName": "Json1",
"type": "DatasetReference"
},
"name": "sink3"
}
],
"transformations": [
{
"name": "derivedColumn1"
},
{
"name": "derivedColumn2"
}
],
"scriptLines": [
"parameters{",
" LastRunTimestamp as string,",
"}",
"source(output(",
" Id as integer,",
" UpdateDateTime as string,",
" PartitionKey as integer,",
" Discriminator as string,",
" InsertDateTime as string",
" ),",
" allowSchemaDrift: true,",
" validateSchema: false,",
" limit: 100,",
" query: ("SELECT c.Id, c.Discriminator, c.PartitionKey, c.InsertDateTime, c.UpdateDateTime FROM c where c.Discriminator = 'D1' AND c.InsertDateTime > '$LastRunTimestamp'"),",
" format: 'documentQuery',",
" systemColumns: false) ~> source1",
"source1 derive(Blob_Location = concat('REDACTED', toString(Id), '.json'),",
" Load_Time = toString(currentUTC(),'yyyy-MM-dd HH:mm:ss'),",
" LastTimestamp = $LastRunTimestamp) ~> derivedColumn1",
"source1 derive(IdAsString = toString(Id)) ~> derivedColumn2",
"derivedColumn1 sink(allowSchemaDrift: true,",
" validateSchema: false,",
" deletable:false,",
" insertable:true,",
" updateable:false,",
" upsertable:false,",
" format: 'table',",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true,",
" errorHandlingOption: 'stopOnFirstError') ~> sink2",
"derivedColumn2 sink(allowSchemaDrift: true,",
" validateSchema: false,",
" rowUrlColumn:'IdAsString',",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true,",
" mapColumn(",
" Id,",
" UpdateDateTime,",
" PartitionKey,",
" Discriminator,",
" InsertDateTime,",
" IdAsString",
" )) ~> sink3"
]
}
}
}
要在 Cosmos db 的动态表达式查询中添加参数,您需要传递引号中包含的字符串参数。
为此,您需要传递如下查询:
concat("SELECT * FROM c WHERE c.Discriminator = 'D1' AND c.InsertDateTime > '" ,$LastRunTimestamp,"'")
所以,它将按如下方式工作,并且数据流将在查询中正确获取参数值:
SELECT * FROM c WHERE c.Discriminator = 'D1' AND c.InsertDateTime > ' $LastRunTimestamp_value'