我需要在SQL中执行查询,需要合并多个相似的流程。为了检查相似性,我需要检查
sourceIPAddress
、 destinationIPAddress
、 sourceTransportPort
、 destinationTransportPort
是否相同,以及下一个流记录的 flowStartMilliseconds
是否小于前一个流的 flowEndMiliseconds + (3600 x 1000)
。 如果这些条件匹配,我需要合并这些流或行,例如添加字节列和数据包列,并将第一行的flowStartMilliseconds
作为flowStartMilliseconds
,将最后一行的flowEndMiliseconds
作为flowEndMiliseconds
。
这就是我到目前为止所做的。
select
sourceIPAddress,
destinationIPAddress,
sourceTransportPort,
destinationTransportPort,
protocol,
serviceName,
flowStartMilliseconds,
flowEndMilliseconds,
row_number() over (
partition by sourceIPAddress,
destinationIPAddress,
sourceTransportPort,
destinationTransportPort
order by
flowStartMilliseconds
) as row_num,
lag(flowEndMilliseconds) over (
partition by sourceIPAddress,
destinationIPAddress,
sourceTransportPort,
destinationTransportPort
order by
flowStartMilliseconds
) as lag_value,
(lag_value + 3600 * 1000) as next_val,
case when flowStartMilliseconds < next_val then 1 else 0 end as condition,
from
read_parquet(
'//path/*/*/*/*/*/*.parquet', hive_partitioning = 1
)
where
protocol = 'TCP';
我使用 duckDB 进行此查询。
就像这个屏幕截图一样,所有三行都应该变成一行。
由于您的问题中没有最小,可重复的示例,我创建了一个简单的数据框来说明该任务,我使用较小的数字以便于阅读,并使用
3
作为行之间的阈值距离当我们想开始新的小组时
┌──────────┬───────┬─────┐
│ group_id ┆ start ┆ end │
│ --- ┆ --- ┆ --- │
│ i64 ┆ i64 ┆ i64 │
╞══════════╪═══════╪═════╡
│ 1 ┆ 1 ┆ 5 │ # start of the continuous group 1
│ 1 ┆ 7 ┆ 8 │ # 7 < 5 + 3 => still in the group 1
│ 1 ┆ 10 ┆ 11 │ # 10 < 8 + 3 => still in the group 1
│ 1 ┆ 15 ┆ 16 │ # start of the continuous group 2
│ 1 ┆ 20 ┆ 23 │ # start of the continuous group 3
│ 1 ┆ 25 ┆ 30 │ # 25 < 23 + 3 => still in the group 3
│ 2 ┆ 2 ┆ 5 │
│ 2 ┆ 7 ┆ 10 │
└──────────┴───────┴─────┘
with cte as (
select
d.group_id, d.start, d.end,
lag(d.end) over (partition by d.group_id order by d.end) as lag_value,
(lag_value + 3) as next_val,
case when d.start >= next_val then 1 else 0 end as group_start
from df as d
), cte2 as (
select
d.group_id, d.start, d.end,
sum(d.group_start) over(partition by d.group_id order by d.start) as rle_id
from cte as d
)
select
d.group_id,
min(d.start) as start,
max(d.end) as end,
count(*) as cnt_records
from cte2 as d
group by
d.group_id,
d.rle_id
order by
d.group_id,
min(d.start)
┌──────────┬───────┬─────┬─────────────┐
│ group_id ┆ start ┆ end ┆ cnt_records │
│ --- ┆ --- ┆ --- ┆ --- │
│ i64 ┆ i64 ┆ i64 ┆ i64 │
╞══════════╪═══════╪═════╪═════════════╡
│ 1 ┆ 1 ┆ 11 ┆ 3 │
│ 1 ┆ 15 ┆ 16 ┆ 1 │
│ 1 ┆ 20 ┆ 30 ┆ 2 │
│ 2 ┆ 2 ┆ 10 ┆ 2 │
└──────────┴───────┴─────┴─────────────┘