对于较早记录 1 小时窗口内发生的记录,我想将
is_duplicate
设置为 TRUE
。
规则是每条记录都应对照最近的先前记录(其中
is_duplicate
为 FALSE
)进行检查,而不仅仅是前一行。
例如,在第 3 行中,时间戳应与第 1 行(最后一个非重复行)进行比较,而不是与已标记为重复的第 2 行进行比较。
我知道这可能需要在后处理作业中完成,但是有没有办法纯粹使用 SQL 来完成此操作?
我正在 Athena 中运行这些查询。
行 | 数据_时间戳 | 是_重复 | 解释 |
---|---|---|---|
1 | 2024-09-30 15:55:50 | 错误 | 没有之前的记录 |
2 | 2024-09-30 16:55:50 | 正确 | 与第 1 行的差异小于或等于 1 小时 |
3 | 2024-09-30 17:36:50 | 错误 | 与第 1 行的差异大于 1 小时 |
4 | 2024-09-30 17:40:50 | 正确 | 与第 3 行的差异小于或等于 1 小时 |
5 | 2024-09-30 17:50:03 | 正确 | 与第 3 行的差异小于或等于 1 小时 |
6 | 2024-09-30 20:27:24 | 错误 | 与第 3 行的差异大于 1 小时 |
7 | 2024-09-30 21:27:24 | 正确 | 与第 6 行的差异小于或等于 1 小时 |
8 | 2024-09-30 22:22:24 | 错误 | 与第 6 行的差异大于 1 小时 |
在 Athena 上使用窗口函数不起作用,因为在计算时它仍然不知道哪一行是最新的先前记录
is_duplicate
这实际上是一个有趣的挑战,并且可能有一种方法可以使用奇特的 Trino/Presto WINDOW 框架定义来使其与窗口函数一起使用,但是我诉诸于使用聚合。这个想法是使用
data_timestamps
函数生成 array_agg
数组,然后应用 reduce
函数,构建一个跟踪 is_duplicate
标志的并行数组,可以通过应用 filter
函数在归约操作期间引用该标志。下面是代码 - 我使用表中的原始数据作为参考,清理它,聚合并减少 CTE,最后 unnest
回到行并连接到原始数据集进行比较 - 下面是输出和查询我在雅典娜身上奔跑:
数据_时间戳 | 是_重复_计算 | 是_重复 | 解释 |
---|---|---|---|
2024-09-30 15:55:50 | 假 | 假 | 没有之前的记录 |
2024-09-30 16:55:50 | 真实 | 真实 | 与第 1 行的差异小于或等于 1 小时 |
2024-09-30 17:36:50 | 假 | 假 | 与第 1 行的差异大于 1 小时 |
2024-09-30 17:40:50 | 真实 | 真实 | 与第 3 行的差异小于或等于 1 小时 |
2024-09-30 17:50:03 | 真实 | 真实 | 与第 3 行的差异小于或等于 1 小时 |
2024-09-30 20:27:24 | 假 | 假 | 与第 3 行的差异大于 1 小时 |
2024-09-30 21:27:24 | 真实 | 真实 | 与第 6 行的差异小于或等于 1 小时 |
2024-09-30 22:22:24 | 假 | 假 | 与第 6 行的差异大于 1 小时 |
with data as
(
select 1 "row", '2024-09-30 15:55:50' data_timestamp, 'FALSE' is_duplicate, 'there''s no preceding logging' explanation
union all
select 2, '2024-09-30 16:55:50', 'TRUE', 'diff with row 1 is less than or equal to 1 hour'
union all
select 3, '2024-09-30 17:36:50', 'FALSE', 'diff with row 1 is larger than 1 hour'
union all
select 4, '2024-09-30 17:40:50', 'TRUE', 'diff with row 3 is less than or equal to 1 hour'
union all
select 5, '2024-09-30 17:50:03', 'TRUE', 'diff with row 3 is less than or equal to 1 hour'
union all
select 6, '2024-09-30 20:27:24', 'FALSE', 'diff with row 3 is larger than 1 hour'
union all
select 7, '2024-09-30 21:27:24', 'TRUE', 'diff with row 6 is less than or equal to 1 hour'
union all
select 8, '2024-09-30 22:22:24', 'FALSE', 'diff with row 6 is larger than 1 hour'
),
data_clean as
(
select
"row",
cast(data_timestamp as timestamp) data_timestamp,
if(is_duplicate = 'TRUE', true, false) is_duplicate,
explanation
from
data
),
data_agg as
(
select
array_agg(data_timestamp order by data_timestamp) timestamps
from
data_clean
),
data_dedup as
(
select
reduce
(
timestamps,
array[],
(s, x) -> s || (x, if(date_diff('minute', element_at(filter(s, y -> y[2] = false), -1)[1], x) <= 60, true, false)),
s -> s
) deduped_timestamps
from
data_agg
)
select
dt.ts[1] data_timestamp,
dt.ts[2] is_duplicate_computed,
dc.is_duplicate,
dc.explanation
from
data_dedup
cross join unnest(deduped_timestamps) as dt(ts)
inner join data_clean dc on dc.data_timestamp = dt.ts[1]
order by 1
;