我有一个存储过程,每隔一小时运行一次,以生成服务中发生的事务的摘要。存储过程从名为 transaction_log 的表中读取数据,并填充(插入或更新行)另一个名为 transaction_log_summary 的表。表之间在约束级别上没有连接,即没有外键引用等。但是由于某种原因,当存储过程在一天中第一次运行并且同时在 transaction_log 上执行了新的插入查询时一次,插入语句失败并超出锁定等待超时;尝试重新启动事务错误。这里使用的默认隔离级别是REPEATABLE READ (innoDB)。读操作如何锁定表以防止插入新行?
下面分享存储过程和相关代码。
用于更新汇总表中条目的函数
DELIMITER //
CREATE PROCEDURE func_update_summary(IN date_param DATE, IN last_summary_index BIGINT)
BEGIN
-- Update the summary for the input date with logs having newer ids
UPDATE transaction_log_summary tls
JOIN (
SELECT
DATE(created_at) AS date,
SUM(CASE WHEN channel = 'NATS' THEN 1 ELSE 0 END) AS channel_NATS,
SUM(CASE WHEN channel = 'SQS' THEN 1 ELSE 0 END) AS channel_SQS,
SUM(CASE WHEN channel = 'API' THEN 1 ELSE 0 END) AS channel_API,
SUM(CASE WHEN status = 'SUCCESS' THEN 1 ELSE 0 END) AS status_SUCCESS,
SUM(CASE WHEN status = 'FAILURE' THEN 1 ELSE 0 END) AS status_FAILURE,
SUM(CASE WHEN status = 'SKIPPED' THEN 1 ELSE 0 END) AS status_SKIPPED,
MAX(id) AS last_transaction_log_index
FROM
transaction_log
WHERE
DATE(created_at) = date_param
AND id > last_summary_index
GROUP BY
DATE(created_at)
) AS temp ON tls.date = temp.date
SET
tls.channel_NATS = tls.channel_NATS + temp.channel_NATS,
tls.channel_SQS = tls.channel_SQS + temp.channel_SQS,
tls.channel_API = tls.channel_API + temp.channel_API,
tls.status_SUCCESS = tls.status_SUCCESS + temp.status_SUCCESS,
tls.status_FAILURE = tls.status_FAILURE + temp.status_FAILURE,
tls.status_SKIPPED = tls.status_SKIPPED + temp.status_SKIPPED,
tls.last_transaction_log_index = temp.last_transaction_log_index
WHERE
tls.date = date_param;
END//
DELIMITER ;
使用上述函数执行日期检查并更新值的存储过程
DELIMITER //
CREATE PROCEDURE proc_update_transaction_log_summary()
BEGIN
DECLARE present_date DATE;
DECLARE last_summary_date DATE;
DECLARE last_summary_index BIGINT;
DECLARE last_log_index BIGINT;
DECLARE has_records INT;
DECLARE error_code INT;
DECLARE error_msg VARCHAR(255);
DECLARE lock_acquired INT DEFAULT 0;
-- Try to acquire the named lock immediately
SET lock_acquired = GET_LOCK('update_summary_lock', 0); -- 0 means try to acquire immediately
IF lock_acquired = 1 THEN
BEGIN
DECLARE EXIT HANDLER FOR SQLEXCEPTION
BEGIN
GET DIAGNOSTICS CONDITION 1 error_code = MYSQL_ERRNO, error_msg = MESSAGE_TEXT;
ROLLBACK;
SELECT CONCAT('Error ', error_code, ': ', error_msg);
DO RELEASE_LOCK('update_summary_lock');
END;
START TRANSACTION;
SET present_date = CURDATE();
-- Get the last index and date from transaction_log_summary table
SELECT last_transaction_log_index, date INTO last_summary_index, last_summary_date
FROM transaction_log_summary
ORDER BY date DESC
LIMIT 1;
-- If last summary date is less than current date, update the summary
IF last_summary_date < present_date THEN
BEGIN
-- Get the last index from the transaction log for the last_summary_date
SELECT MAX(id) INTO last_log_index FROM transaction_log
WHERE DATE(created_at) = last_summary_date;
IF last_log_index > last_summary_index THEN
CALL func_update_summary(last_summary_date, last_summary_index);
END IF;
-- Check if there are records for the present date
SELECT EXISTS ( SELECT 1 FROM transaction_log WHERE DATE(created_at) =
CURRENT_DATE LIMIT 1) INTO has_records;
IF has_records THEN
-- Insert new record in the summary table for the current date
INSERT INTO transaction_log_summary (date, channel_NATS, channel_SQS, channel_API, status_SUCCESS, status_FAILURE, status_SKIPPED, last_transaction_log_index)
SELECT present_date,
SUM(CASE WHEN channel = 'NATS' THEN 1 ELSE 0 END) AS channel_NATS,
SUM(CASE WHEN channel = 'SQS' THEN 1 ELSE 0 END) AS channel_SQS,
SUM(CASE WHEN channel = 'API' THEN 1 ELSE 0 END) AS channel_API,
SUM(CASE WHEN status = 'SUCCESS' THEN 1 ELSE 0 END) AS status_SUCCESS,
SUM(CASE WHEN status = 'FAILURE' THEN 1 ELSE 0 END) AS status_FAILURE,
SUM(CASE WHEN status = 'SKIPPED' THEN 1 ELSE 0 END) AS status_SKIPPED,
MAX(id) AS last_transaction_log_index
FROM transaction_log
WHERE DATE(created_at) = present_date;
ELSE
-- Insert new record in the summary table for the present date with last index from previous day
INSERT INTO transaction_log_summary (date, last_transaction_log_index)
SELECT present_date, MAX(id) AS last_transaction_log_index
FROM transaction_log;
END IF;
END;
ELSE
BEGIN
-- Get the last index from the transaction log for the last_summary_date
SELECT MAX(id) INTO last_log_index FROM transaction_log
WHERE DATE(created_at) = present_date;
-- Check if there are new entries in the log table
IF last_log_index > last_summary_index THEN
CALL func_update_summary(present_date, last_summary_index);
END IF;
END;
END IF;
COMMIT;
SELECT ''; -- empty msg utilised by app layer to identify successful end of execution
DO RELEASE_LOCK('update_summary_lock');
END;
END IF;
END//
DELIMITER ;
事件计划每隔一小时执行一次该过程
DELIMITER //
CREATE EVENT job_update_transaction_log_summary
ON SCHEDULE
EVERY 1 HOUR
DO
BEGIN
CALL proc_update_transaction_log_summary();
END//
DELIMITER ;
此外,观察到在存储过程隔离级别更新为 READ COMMITTED 的部署环境之一中,我们没有遇到此问题。 即在存储过程中,在事务开始之前,我们设置隔离级别。
SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED;
但是读操作怎么会导致这个问题呢?存储过程的不同隔离级别的设置是否真正解决了问题,还是我们会在不同的用例中遇到该问题?
作为 DML 语句 (
INSERT
/UPDATE
/DELETE
) 一部分执行的任何读查询都隐式是锁定读。
https://dev.mysql.com/doc/refman/8.0/en/innodb-locks-set.html 说:
设置独占索引 插入 T 的每一行上的记录锁(没有间隙锁)。如果 事务隔离级别是 READ COMMITTED,InnoDB 进行搜索 在 S 上作为一致读取(无锁)。否则,InnoDB 设置共享 来自 S 的行上的下一个键锁。InnoDB 必须在后者中设置锁 案例:在使用基于语句的二进制日志进行前滚恢复期间, 每个 SQL 语句必须以完全相同的方式执行 原来已经完成了。INSERT INTO T SELECT ... FROM S WHERE ...
使用共享的下一个键执行 SELECT 锁定或作为一致读取,如CREATE TABLE ... SELECT ...
。INSERT ... SELECT
当
用于构造SELECT
或REPLACE INTO t SELECT ... FROM s WHERE ...
时, InnoDB 对表 s 中的行设置共享下一键锁。UPDATE t ... WHERE col IN (SELECT ... FROM s ...)
该手册页没有明确列出这些情况,但这也会影响以下情况:
SELECT ... INTO @var;
SET @var = (SELECT ...);
UPDATE ... JOIN (SELECT...)
Foreign key checks
SELECT inside a DML trigger