我有一个按 event_timestamp 分区的 PostgreSQL 事件表:
CREATE TABLE events
(
id SERIAL PRIMARY KEY,
event_timestamp TIMESTAMP NOT NULL,
processed BOOLEAN DEFAULT FALSE,
payload JSONB
) PARTITION BY RANGE (event_timestamp);
目前,单个工作人员轮询并处理事件,将它们标记为已处理以避免重新处理。使用的查询是:
SELECT *
FROM events
WHERE processed = false
ORDER BY event_timestamp
LIMIT 10_000;
为了提高吞吐量,我需要多名工人。然而,这存在重复处理的风险,因为工作人员可能会同时选择相同的事件。
我正在寻求一种有效的策略,允许多个工作人员同时处理事件而不重复。该解决方案应确保每个事件仅处理一次。我怎样才能在 PostgreSQL 中实现这一目标?任何指导或示例将不胜感激。
您可以使用显式行锁。在这个 FOR UPDATE SKIP LOCKED
的末尾添加
select
就是这样:
SELECT *
FROM events
WHERE processed = false
ORDER BY event_timestamp
LIMIT 10_000
FOR UPDATE SKIP LOCKED;--here
一旦工作人员 A 读取了 10k 行,他们就会将其锁定
FOR UPDATE
直到他们 COMMIT
或 ROLLBACK
他们的事务。如果在此之前另一个工作人员请求另外 10k,他们会看到前 10k 被锁定,他们会跳过它们,感谢SKIP LOCKED
。
确保您的工作人员使用单独的会话/事务 - 某些连接池可以配置为针对不同的查询重复使用相同的会话和事务,这不适用于此类锁定。