我在Data Factory中安排作业时遇到问题。我正在尝试每小时接近一个预定的工作,这将每小时执行相同的脚本,条件不同。
考虑一下我在Azure Data Lake Store中传播了一堆Avro文件,其格式如下。 /数据/ SomeEntity / {日期:YYYY} / {日期:MM} / {日期:DD} / {SomeEntity_日期:YYYY} {日期:MM} {日期:DD} __ {日期:H}
每小时将新文件添加到Data Lake Store。为了处理文件,我决定通过U-SQL虚拟文件集列和我在Data Lake Store中创建的一些SyncTable处理它们。
我的查询如下所示。
DECLARE @file_set_path string = /Data/SomeEntity/{date:yyyy}/{date:MM}/{date:dd}/SomeEntity_{date:yyyy}_{date:MM}_{date:dd}__{date:H};
@result = EXTRACT [Id] long,
....
date DateTime
FROM @file_set_path
USING someextractor;
@rdate =
SELECT MAX(ProcessedDate) AS ProcessedDate
FROM dbo.SyncTable
WHERE EntityName== "SomeEntity";
@finalResult = SELECT [Id],... FROM @result
CROSS JOIN @rdate AS r
WHERE date >= r.ProcessedDate;
因为我不能在where子句中使用rowset变量我将set与singe行交叉连接,但是即使在这种情况下,U-SQL也找不到正确的文件,并且总是返回所有设置的文件。
有任何解决方法或其他方法吗?
另请注意,文件集无法在动态连接上执行分区消除,因为在准备阶段优化程序不知道这些值。
我建议将同步点作为参数从ADF传递到处理脚本。然后优化器知道该值,并且文件集分区消除将启动。在最坏的情况下,您必须从先前脚本中的同步表中读取值并在下一个脚本中将其用作参数。
我认为这种方法应该有效,除非某些地方不太正确,即你能否确认dbo.SyncTable
表的数据类型?转出@rdate
并确保你获得的价值是你所期望的。
我整理了一个按预期工作的简单演示。我的SyncTable副本有一条值为01/01/2018的记录:
@working =
SELECT *
FROM (
VALUES
( (int)1, DateTime.Parse("2017/12/31") ),
( (int)2, DateTime.Parse("2018/01/01") ),
( (int)3, DateTime.Parse("2018/02/01") )
) AS x ( id, someDate );
@rdate =
SELECT MAX(ProcessedDate) AS maxDate
FROM dbo.SyncTable;
//@output =
// SELECT *
// FROM @rdate;
@output =
SELECT *, (w.someDate - r.maxDate).ToString() AS diff
FROM @working AS w
CROSS JOIN
@rdate AS r
WHERE w.someDate >= r.maxDate;
OUTPUT @output TO "/output/output.csv"
USING Outputters.Csv();
我尝试使用文件路径(完整脚本here)。要记住的是自定义日期格式H将小时表示为0到23之间的数字。如果您的SyncTable日期在插入时没有时间组件,则默认为午夜(0),这意味着整个将收集一天。根据您的模式,您的文件结构应如下所示:
"D:\Data Lake\USQLDataRoot\Data\SomeEntity\2017\12\31\SomeEntity_2017_12_31__8\test.csv"
我注意到你的文件路径在第二部分有一个下划线,在小时部分之前有一个双下划线(在0到23之间,单个数字到小时10)。我注意到你的文件集路径没有文件类型或引号 - 我在我的测试中使用了test.csv
。我的结果:
基本上我认为这种方法会起作用,但是有些东西不太正确,可能在你的文件结构中,你的SyncTable中的值,数据类型等等。你需要仔细检查细节,转出中间值来检查,直到找到问题。
wBob完整脚本的要点不能解决您的问题吗?这是wBob完整脚本的一个非常轻微编辑的版本,用于解决您提出的一些问题:
\Data\SomeEntity\2018\01\01\SomeEntity_2018_01_01__1
DECLARE @file_set_path string = @"/Data/SomeEntity/{date:yyyy}/{date:MM}/{date:dd}/SomeEntity_{date:yyyy}_{date:MM}_{date:dd}__{date:H}";
@input =
EXTRACT [Id] long,
date DateTime
FROM @file_set_path
USING Extractors.Text();
// in lieu of creating actual table
@syncTable =
SELECT * FROM
( VALUES
( "SomeEntity", new DateTime(2018,01,01,01,00,00) ),
( "AnotherEntity", new DateTime(2018,01,01,01,00,00) ),
( "SomeEntity", new DateTime(2018,01,01,00,00,00) ),
( "AnotherEntity", new DateTime(2018,01,01,00,00,00) ),
( "SomeEntity", new DateTime(2017,12,31,23,00,00) ),
( "AnotherEntity", new DateTime(2017,12,31,23,00,00) )
) AS x ( EntityName, ProcessedDate );
@rdate =
SELECT MAX(ProcessedDate) AS maxDate
FROM @syncTable
WHERE EntityName== "SomeEntity";
@output =
SELECT *,
date.ToString() AS dateString
FROM @input AS i
CROSS JOIN
@rdate AS r
WHERE i.date >= r.maxDate;
OUTPUT @output
TO "/output/output.txt"
ORDER BY Id
USING Outputters.Text(quoting:false);