如何根据分区依据中的特定条件合并 SQL 表中的相似行?

问题描述 投票:0回答:1

我需要在SQL中执行查询,需要合并多个相似的流程。为了检查相似性,我需要检查

sourceIPAddress
destinationIPAddress
sourceTransportPort
destinationTransportPort
是否相同,以及下一个流记录的
flowStartMilliseconds
是否小于前一个流的
flowEndMiliseconds + (3600 x 1000)
。 如果这些条件匹配,我需要合并这些流或行,例如添加字节列和数据包列,并将第一行的
flowStartMilliseconds
作为
flowStartMilliseconds
,将最后一行的
flowEndMiliseconds
作为
flowEndMiliseconds

这就是我到目前为止所做的。

select 
  sourceIPAddress, 
  destinationIPAddress, 
  sourceTransportPort, 
  destinationTransportPort, 
  protocol, 
  serviceName, 
  flowStartMilliseconds, 
  flowEndMilliseconds, 
  row_number() over (
    partition by sourceIPAddress, 
    destinationIPAddress, 
    sourceTransportPort, 
    destinationTransportPort 
    order by 
      flowStartMilliseconds
  ) as row_num, 
  lag(flowEndMilliseconds) over (
    partition by sourceIPAddress, 
    destinationIPAddress, 
    sourceTransportPort, 
    destinationTransportPort 
    order by 
      flowStartMilliseconds
  ) as lag_value, 
  (lag_value + 3600 * 1000) as next_val, 
  case when flowStartMilliseconds < next_val then 1 else 0 end as condition, 
from 
  read_parquet(
    '//path/*/*/*/*/*/*.parquet', hive_partitioning = 1
  ) 
where 
  protocol = 'TCP';

我使用 duckDB 进行此查询。

这里给你同一张桌子: enter image description here

就像这个屏幕截图一样,所有三行都应该变成一行。

sql duckdb
1个回答
0
投票

由于您的问题中没有最小,可重复的示例,我创建了一个简单的数据框来说明该任务,我使用较小的数字以便于阅读,并使用

3
作为行之间的阈值距离当我们想开始新的小组时

┌──────────┬───────┬─────┐
│ group_id ┆ start ┆ end │
│ ---      ┆ ---   ┆ --- │
│ i64      ┆ i64   ┆ i64 │
╞══════════╪═══════╪═════╡
│ 1        ┆ 1     ┆ 5   │ # start of the continuous group 1
│ 1        ┆ 7     ┆ 8   │ # 7 < 5 + 3 => still in the group 1
│ 1        ┆ 10    ┆ 11  │ # 10 < 8 + 3 => still in the group 1
│ 1        ┆ 15    ┆ 16  │ # start of the continuous group 2
│ 1        ┆ 20    ┆ 23  │ # start of the continuous group 3
│ 1        ┆ 25    ┆ 30  │ # 25 < 23 + 3 => still in the group 3
│ 2        ┆ 2     ┆ 5   │ 
│ 2        ┆ 7     ┆ 10  │
└──────────┴───────┴─────┘
with cte as (
    select
        d.group_id, d.start, d.end,
        lag(d.end) over (partition by d.group_id order by d.end) as lag_value,
        (lag_value + 3) as next_val,
        case when d.start >= next_val then 1 else 0 end as group_start
    from df as d
), cte2 as (
    select
        d.group_id, d.start, d.end,
        sum(d.group_start) over(partition by d.group_id order by d.start) as rle_id
    from cte as d
)
select
    d.group_id,
    min(d.start) as start,
    max(d.end) as end,
    count(*) as cnt_records
from cte2 as d
group by
    d.group_id,
    d.rle_id
order by
    d.group_id,
    min(d.start)

┌──────────┬───────┬─────┬─────────────┐
│ group_id ┆ start ┆ end ┆ cnt_records │
│ ---      ┆ ---   ┆ --- ┆ ---         │
│ i64      ┆ i64   ┆ i64 ┆ i64         │
╞══════════╪═══════╪═════╪═════════════╡
│ 1        ┆ 1     ┆ 11  ┆ 3           │
│ 1        ┆ 15    ┆ 16  ┆ 1           │
│ 1        ┆ 20    ┆ 30  ┆ 2           │
│ 2        ┆ 2     ┆ 10  ┆ 2           │
└──────────┴───────┴─────┴─────────────┘
© www.soinside.com 2019 - 2024. All rights reserved.