我有一个事件表,用作分布式计算系统的日志。
看起来是这样的:
CREATE TABLE events(
id SERIAL PRIMARY KEY,
occurred_at timestamp with time zone DEFAULT now()
);
为了简单起见,我排除了表中一些额外的、不相关的信息。
系统运行时通常会经历大量活动,然后活动很少的时期,我想追溯地识别这些时期。
理想情况下会有一些函数
identify_activity_periods(tolerance text)
可以输出这些周期。 tolerance
这里代表一段时间的有效文本描述,如 Postgres 日期比较中使用的,例如 '1h'
或 '3m'
要手动查找它们,您必须按时间对每个事件进行排序,并将上一个/下一个事件的
tolerance
内发生的所有事件按顺序聚集到同一活动周期中。这将返回一个包含这些独特活动周期的表,包括结束该周期的开始事件和最终事件(然后您可以使用它来在视图中公开开始和结束时间)。
这可能是一个相当简单的分组,但它的表达却让我困惑(以及我试图找到类似问题的尝试)。
你会怎么做?
如果事件表有类似的数据
id: time stamp
3: now()
2: now()-'1m'
1: now()-'1h'
那么您会期望
identify_activity_periods('10m')
返回两个活动周期:
id: start_event, end_event
2: 2, 3
1: 1, 1
你会期望
identify_activity_periods('2h')
回来
id: start_event, end_event
1: 1, 3
ordered_events
CTEclusters
CTEcluster_groups
CTEactivity_periods
CTECREATE OR REPLACE FUNCTION identify_activity_periods(tolerance INTERVAL)
RETURNS TABLE(id INT, start_event INT, end_event INT) AS $$
BEGIN
RETURN QUERY
WITH ordered_events AS (
SELECT
e.id AS event_id,
e.occurred_at,
ROW_NUMBER() OVER (ORDER BY e.occurred_at)::INT AS rn
FROM events e
),
clusters AS (
SELECT
oe.event_id,
oe.occurred_at,
CASE
WHEN oe.occurred_at - LAG(oe.occurred_at) OVER (ORDER BY oe.occurred_at) <= tolerance THEN 0
ELSE 1
END AS cluster_start,
ROW_NUMBER() OVER (ORDER BY oe.occurred_at)::INT AS rn
FROM ordered_events oe
),
cluster_groups AS (
SELECT
c.event_id,
c.occurred_at,
SUM(c.cluster_start) OVER (ORDER BY c.occurred_at)::INT AS cluster_id
FROM clusters c
),
activity_periods AS (
SELECT
cg.cluster_id,
MIN(cg.event_id) AS start_event,
MAX(cg.event_id) AS end_event
FROM cluster_groups cg
GROUP BY cg.cluster_id
)
SELECT
ROW_NUMBER() OVER (ORDER BY ap.cluster_id)::INT AS id,
ap.start_event,
ap.end_event
FROM activity_periods ap
ORDER BY ap.start_event;
END;
$$ LANGUAGE plpgsql;
输出
SELECT * FROM identify_activity_periods('10 minutes');
id | 开始事件 | 结束_活动 |
---|---|---|
1 | 1 | 1 |
2 | 2 | 3 |
SELECT * FROM identify_activity_periods('2 hours');
id | 开始事件 | 结束_活动 |
---|---|---|
1 | 1 | 3 |