使用 SELECT FOR UPDATE SKIP LOCKED 时 MySQL 死锁?

问题描述 投票:0回答:1

我有一个使用PHP和MySQL构建的队列系统(我最初使用redis构建它,但它有点复杂而且不太灵活),经过一些研究我发现从MySQL 8.0.1开始,有一个新的参数这使得(高效?可靠?)使用 MySQL 构建队列成为可能,并且新参数是

FOR UPDATE SKIP LOCKED
,从我读到的内容来看,应该可以防止死锁,一些参考文献 ref 1参考 2参考 3

在我当前的实现中,我有两个表,称为

QueueJobsPending
nad
QueueJobsProcessed
,顾名思义,第一个表仅保存待处理的作业,另一个保存已处理的作业(失败和成功),这些表如下如下。

CREATE TABLE QueueJobsPending (
    id CHAR(15) BINARY UNIQUE NOT NULL, PRIMARY KEY (id),
    jobKey VARCHAR(255) NOT NULL,
    jobState ENUM('Pending', 'Processing') NOT NULL,
    queueName VARCHAR(255) NOT NULL,
    params JSON NOT NULL,
    tags JSON NOT NULL,
    maxExecutionTime INTEGER NOT NULL,
    maxReleases INTEGER NOT NULL,
    releasesCounter INTEGER NOT NULL,
    maxRetries INTEGER NOT NULL,
    retriesCounter INTEGER NOT NULL,
    executeAfterTimestamp INTEGER UNSIGNED NOT NULL,
    createdAt TIMESTAMP NOT NULL,
    updatedAt TIMESTAMP NULL,
    sequence BIGINT UNSIGNED NOT NULL AUTO_INCREMENT, KEY sequence (sequence)
);

CREATE INDEX index_jobKey ON QueueJobsPending(jobKey);
CREATE INDEX index_jobState ON QueueJobsPending(jobState);
CREATE INDEX index_queueName ON QueueJobsPending(queueName);
CREATE INDEX index_executeAfterTimestamp ON QueueJobsPending(executeAfterTimestamp);
CREATE TABLE QueueJobsProcessed (
    id CHAR(15) BINARY UNIQUE NOT NULL, PRIMARY KEY (id),
    pendingQueueJobId CHAR(15) BINARY UNIQUE NOT NULL, -- ONLY FOR THE PURPOSES OF ENSURING A PENDING JOB IS NOT PROCESSED TWICE
    jobKey VARCHAR(255) NOT NULL,
    jobState ENUM('Completed','Failed') NOT NULL,
    queueName VARCHAR(255) NOT NULL,
    params JSON NOT NULL,
    tags JSON NOT NULL,
    errors JSON NOT NULL,
    maxExecutionTime INTEGER NOT NULL,
    maxReleases INTEGER NOT NULL,
    releasesCounter INTEGER NOT NULL,
    maxRetries INTEGER NOT NULL,
    retriesCounter INTEGER NOT NULL,
    executeAfterTimestamp INTEGER UNSIGNED NOT NULL,
    createdAt TIMESTAMP NOT NULL,
    sequence BIGINT UNSIGNED NOT NULL AUTO_INCREMENT, KEY sequence (sequence)
);

CREATE INDEX index_jobKey ON QueueJobsProcessed(jobKey);
CREATE INDEX index_jobState ON QueueJobsProcessed(jobState);
CREATE INDEX index_queueName ON QueueJobsProcessed(queueName);

当检索下一个要处理的作业时,我在事务中运行以下两个查询。

SELECT *
FROM QueueJobsPending
WHERE
    jobState = 'Pending'
    AND executeAfterTimestamp <= 123456789
    AND queueName IN ('High','Medium','Low')
ORDER BY FIELD(queueName, 'High','Medium','Low'), sequence ASC
LIMIT 1
FOR UPDATE SKIP LOCKED;

如果有待处理的作业,那么我会立即使用以下查询将其状态更新为“待处理”。

UPDATE QueueJobsPending
SET jobKey = 'SendCompanyUserAppointmentRescheduledEmail',jobState = 'Processing',queueName = 'Medium',params = '{\"orderId\":\"jNpAhJ9oOLqJkjv\"}',tags = '{\"orderId\":\"jNpAhJ9oOLqJkjv\",\"companyId\":\"87ZIKFuNgI14YSW\"}',maxExecutionTime = '30',maxReleases = '0',releasesCounter = '0',maxRetries = '3',retriesCounter = '0',executeAfterTimestamp = '1733005138',createdAt = '2024-11-30 22:18:58',updatedAt = '2024-11-30 22:18:59'
WHERE id = 'K4cr7SCAaIQDsxl'

注意:在实际处理作业之前,我将状态更新为“正在处理”,以使这两个查询运行得尽可能接近,并防止其他工作人员接受相同的作业(这显然并不总是按预期发生,不然我就不会在这里了😭)

最后提交交易。

这是我的存储库中获取下一个要处理的作业的确切 PHP 代码。

// PendingQueueJobRepositoryMySQL.php

public function getNextJob(array $queueNames): ?PendingQueueJob
{
    if (empty($queueNames)) {
        throw new QueueException('Queue name list should not be empty.');
    }

    $pendingQueueJobColumns = $this->pendingQueueJobHydrator->getTableColumnsForSelect();
    $now = $this->clock->now(TimeZoneEnum::UTC)->toTimestamp();

    $queueNames = array_map(fn (string $queueName) => $this->quote($queueName), $queueNames);
    $queueNamesStr = implode(',', $queueNames);

    $pendingStateStr = PendingQueueJobStateEnum::Pending->value;

    try {
        $this->beginTransaction();

        $query = "
            SELECT {$pendingQueueJobColumns}
            FROM QueueJobsPending
            WHERE
                jobState = '{$pendingStateStr}'
                AND executeAfterTimestamp <= '{$now}'
                AND queueName IN ({$queueNamesStr})
            ORDER BY FIELD(queueName, {$queueNamesStr}), sequence ASC
            LIMIT 1
            FOR UPDATE SKIP LOCKED;
        ";

        $record = $this->fetchSingle($query);
        $job = null;

        if ($record) {
            $job = $this->pendingQueueJobHydrator->single($record);
            $job = $job->setProcessing();

            $this->update($job);
        }

        $this->commitTransaction();

        return $job;
    } catch (\Throwable $th) {
        $this->rollbackTransaction();

        throw $th;
    }
}
// PendingQueueJob.php file

public function setProcessing(): self
{
    return new self(
        // Update
        jobState: PendingQueueJobStateEnum::Processing,
        updatedAt: AppDateTime::now(),
        //Same
        id: $this->id,
        jobKey: $this->jobKey,
        queueName: $this->queueName,
        params: $this->params,
        tags: $this->tags,
        maxExecutionTime: $this->maxExecutionTime,
        maxReleases: $this->maxReleases,
        releasesCounter: $this->releasesCounter,
        maxRetries: $this->maxRetries,
        retriesCounter: $this->retriesCounter,
        executeAfterTimestamp: $this->executeAfterTimestamp,
        createdAt: $this->createdAt,
    );
}
// PendingQueueJobRepositoryMySQL.php

public function update(PendingQueueJob $entity): void
{
    $columnsForUpdate = $this->pendingQueueJobHydrator->getTableColumnsForUpdate();

    $query = "
        UPDATE QueueJobsPending
        SET {$columnsForUpdate}
        WHERE id = :id;
    ";

    $data = $this->pendingQueueJobHydrator->serialize($entity);

    $this->execute($query, $data);
}

如果从

getNextJob
返回一个实例(现在应该已设置为
Processing
),那么工作人员将对其进行处理,完成后将从
QueueJobPending
表中硬删除它并将其移动到
QueueJobsProcessed
一个。

但是在测试我的应用程序后,我注意到我遇到了像这样的死锁异常(这种日志记录样式仅在本地用于开发)。

Array
(
    [loggedAt] => 2024-11-30 22:18:59.563000
    [content] => Array
        (
            [exception] => App\Support\Infrastructure\Exception\DatabaseException
            [message] => SQLSTATE[40001]: Serialization failure: 1213 Deadlock found when trying to get lock; try restarting transaction
            [file] => /var/www/app/backend/App/Support/Infrastructure/Database/Repository/AbstractPDORepository.php
            [line] => 116
            [context] => Array
                (
                    [params] => Array
                        (
                            [id] => K4cr7SCAaIQDsxl
                            [jobKey] => SendCompanyUserAppointmentRescheduledEmail
                            [jobState] => Processing
                            [queueName] => Medium
                            [params] => {"orderId":"jNpAhJ9oOLqJkjv"}
                            [tags] => {"orderId":"jNpAhJ9oOLqJkjv","companyId":"87ZIKFuNgI14YSW"}
                            [maxExecutionTime] => 30
                            [maxReleases] => 0
                            [releasesCounter] => 0
                            [maxRetries] => 3
                            [retriesCounter] => 0
                            [executeAfterTimestamp] => 1733005138
                            [createdAt] => 2024-11-30 22:18:58
                            [updatedAt] => 2024-11-30 22:18:59
                        )

                    [query] => 
            UPDATE QueueJobsPending
            SET jobKey = :jobKey,jobState = :jobState,queueName = :queueName,params = :params,tags = :tags,maxExecutionTime = :maxExecutionTime,maxReleases = :maxReleases,releasesCounter = :releasesCounter,maxRetries = :maxRetries,retriesCounter = :retriesCounter,executeAfterTimestamp = :executeAfterTimestamp,createdAt = :createdAt,updatedAt = :updatedAt
            WHERE id = :id;
        
                )
        )

    [stackTrace] => Array
        (
            [0] => disabled
        )

)

这是来自 MySQL 的有关特定死锁的相关日志(我相信)。

请搜索字符串

K4cr7SCAaIQDsxl
,这是与死锁相关的实体ID,您将看到针对该ID的两条更新语句,一条成功,另一条回滚,接近尾声。

2024-11-30T22:18:59.521235Z    15 Execute   SELECT QueueJobsPending.id AS QueueJobsPending_id,QueueJobsPending.jobKey AS QueueJobsPending_jobKey,QueueJobsPending.jobState AS QueueJobsPending_jobState,QueueJobsPending.queueName AS QueueJobsPending_queueName,QueueJobsPending.params AS QueueJobsPending_params,QueueJobsPending.tags AS QueueJobsPending_tags,QueueJobsPending.maxExecutionTime AS QueueJobsPending_maxExecutionTime,QueueJobsPending.maxReleases AS QueueJobsPending_maxReleases,QueueJobsPending.releasesCounter AS QueueJobsPending_releasesCounter,QueueJobsPending.maxRetries AS QueueJobsPending_maxRetries,QueueJobsPending.retriesCounter AS QueueJobsPending_retriesCounter,QueueJobsPending.executeAfterTimestamp AS QueueJobsPending_executeAfterTimestamp,QueueJobsPending.createdAt AS QueueJobsPending_createdAt,QueueJobsPending.updatedAt AS QueueJobsPending_updatedAt
                FROM QueueJobsPending
                WHERE
                    jobState = 'Pending'
                    AND executeAfterTimestamp <= '1733005139'
                    AND queueName IN ('low','high','medium')
                ORDER BY FIELD(queueName, 'low','high','medium'), sequence ASC
                LIMIT 1
                FOR UPDATE SKIP LOCKED
2024-11-30T22:18:59.521267Z    21 Prepare   SELECT QueueJobsPending.id AS QueueJobsPending_id,QueueJobsPending.jobKey AS QueueJobsPending_jobKey,QueueJobsPending.jobState AS QueueJobsPending_jobState,QueueJobsPending.queueName AS QueueJobsPending_queueName,QueueJobsPending.params AS QueueJobsPending_params,QueueJobsPending.tags AS QueueJobsPending_tags,QueueJobsPending.maxExecutionTime AS QueueJobsPending_maxExecutionTime,QueueJobsPending.maxReleases AS QueueJobsPending_maxReleases,QueueJobsPending.releasesCounter AS QueueJobsPending_releasesCounter,QueueJobsPending.maxRetries AS QueueJobsPending_maxRetries,QueueJobsPending.retriesCounter AS QueueJobsPending_retriesCounter,QueueJobsPending.executeAfterTimestamp AS QueueJobsPending_executeAfterTimestamp,QueueJobsPending.createdAt AS QueueJobsPending_createdAt,QueueJobsPending.updatedAt AS QueueJobsPending_updatedAt
                FROM QueueJobsPending
                WHERE
                    jobState = 'Pending'
                    AND executeAfterTimestamp <= '1733005139'
                    AND queueName IN ('medium','high','low')
                ORDER BY FIELD(queueName, 'medium','high','low'), sequence ASC
                LIMIT 1
                FOR UPDATE SKIP LOCKED
2024-11-30T22:18:59.522084Z    21 Execute   SELECT QueueJobsPending.id AS QueueJobsPending_id,QueueJobsPending.jobKey AS QueueJobsPending_jobKey,QueueJobsPending.jobState AS QueueJobsPending_jobState,QueueJobsPending.queueName AS QueueJobsPending_queueName,QueueJobsPending.params AS QueueJobsPending_params,QueueJobsPending.tags AS QueueJobsPending_tags,QueueJobsPending.maxExecutionTime AS QueueJobsPending_maxExecutionTime,QueueJobsPending.maxReleases AS QueueJobsPending_maxReleases,QueueJobsPending.releasesCounter AS QueueJobsPending_releasesCounter,QueueJobsPending.maxRetries AS QueueJobsPending_maxRetries,QueueJobsPending.retriesCounter AS QueueJobsPending_retriesCounter,QueueJobsPending.executeAfterTimestamp AS QueueJobsPending_executeAfterTimestamp,QueueJobsPending.createdAt AS QueueJobsPending_createdAt,QueueJobsPending.updatedAt AS QueueJobsPending_updatedAt
                FROM QueueJobsPending
                WHERE
                    jobState = 'Pending'
                    AND executeAfterTimestamp <= '1733005139'
                    AND queueName IN ('medium','high','low')
                ORDER BY FIELD(queueName, 'medium','high','low'), sequence ASC
                LIMIT 1
                FOR UPDATE SKIP LOCKED

...
...
...
...

2024-11-30T22:18:59.530642Z    12 Execute   SELECT QueueJobsPending.id AS QueueJobsPending_id,QueueJobsPending.jobKey AS QueueJobsPending_jobKey,QueueJobsPending.jobState AS QueueJobsPending_jobState,QueueJobsPending.queueName AS QueueJobsPending_queueName,QueueJobsPending.params AS QueueJobsPending_params,QueueJobsPending.tags AS QueueJobsPending_tags,QueueJobsPending.maxExecutionTime AS QueueJobsPending_maxExecutionTime,QueueJobsPending.maxReleases AS QueueJobsPending_maxReleases,QueueJobsPending.releasesCounter AS QueueJobsPending_releasesCounter,QueueJobsPending.maxRetries AS QueueJobsPending_maxRetries,QueueJobsPending.retriesCounter AS QueueJobsPending_retriesCounter,QueueJobsPending.executeAfterTimestamp AS QueueJobsPending_executeAfterTimestamp,QueueJobsPending.createdAt AS QueueJobsPending_createdAt,QueueJobsPending.updatedAt AS QueueJobsPending_updatedAt
                FROM QueueJobsPending
                WHERE
                    jobState = 'Pending'
                    AND executeAfterTimestamp <= '1733005139'
                    AND queueName IN ('high','medium','low')
                ORDER BY FIELD(queueName, 'high','medium','low'), sequence ASC
                LIMIT 1
                FOR UPDATE SKIP LOCKED
2024-11-30T22:18:59.531114Z    12 Close stmt    
2024-11-30T22:18:59.531218Z    12 Query COMMIT
2024-11-30T22:18:59.531533Z    21 Close stmt    
2024-11-30T22:18:59.531854Z    17 Prepare   SELECT QueueJobsPending.id AS QueueJobsPending_id,QueueJobsPending.jobKey AS QueueJobsPending_jobKey,QueueJobsPending.jobState AS QueueJobsPending_jobState,QueueJobsPending.queueName AS QueueJobsPending_queueName,QueueJobsPending.params AS QueueJobsPending_params,QueueJobsPending.tags AS QueueJobsPending_tags,QueueJobsPending.maxExecutionTime AS QueueJobsPending_maxExecutionTime,QueueJobsPending.maxReleases AS QueueJobsPending_maxReleases,QueueJobsPending.releasesCounter AS QueueJobsPending_releasesCounter,QueueJobsPending.maxRetries AS QueueJobsPending_maxRetries,QueueJobsPending.retriesCounter AS QueueJobsPending_retriesCounter,QueueJobsPending.executeAfterTimestamp AS QueueJobsPending_executeAfterTimestamp,QueueJobsPending.createdAt AS QueueJobsPending_createdAt,QueueJobsPending.updatedAt AS QueueJobsPending_updatedAt
                FROM QueueJobsPending
                WHERE
                    jobState = 'Pending'
                    AND executeAfterTimestamp <= '1733005139'
                    AND queueName IN ('low','high','medium')
                ORDER BY FIELD(queueName, 'low','high','medium'), sequence ASC
                LIMIT 1
                FOR UPDATE SKIP LOCKED
2024-11-30T22:18:59.531918Z    17 Execute   SELECT QueueJobsPending.id AS QueueJobsPending_id,QueueJobsPending.jobKey AS QueueJobsPending_jobKey,QueueJobsPending.jobState AS QueueJobsPending_jobState,QueueJobsPending.queueName AS QueueJobsPending_queueName,QueueJobsPending.params AS QueueJobsPending_params,QueueJobsPending.tags AS QueueJobsPending_tags,QueueJobsPending.maxExecutionTime AS QueueJobsPending_maxExecutionTime,QueueJobsPending.maxReleases AS QueueJobsPending_maxReleases,QueueJobsPending.releasesCounter AS QueueJobsPending_releasesCounter,QueueJobsPending.maxRetries AS QueueJobsPending_maxRetries,QueueJobsPending.retriesCounter AS QueueJobsPending_retriesCounter,QueueJobsPending.executeAfterTimestamp AS QueueJobsPending_executeAfterTimestamp,QueueJobsPending.createdAt AS QueueJobsPending_createdAt,QueueJobsPending.updatedAt AS QueueJobsPending_updatedAt
                FROM QueueJobsPending
                WHERE
                    jobState = 'Pending'
                    AND executeAfterTimestamp <= '1733005139'
                    AND queueName IN ('low','high','medium')
                ORDER BY FIELD(queueName, 'low','high','medium'), sequence ASC
                LIMIT 1
                FOR UPDATE SKIP LOCKED
2024-11-30T22:18:59.532387Z    21 Prepare   UPDATE QueueJobsPending
            SET jobKey = ?,jobState = ?,queueName = ?,params = ?,tags = ?,maxExecutionTime = ?,maxReleases = ?,releasesCounter = ?,maxRetries = ?,retriesCounter = ?,executeAfterTimestamp = ?,createdAt = ?,updatedAt = ?
            WHERE id = ?
2024-11-30T22:18:59.532511Z    17 Close stmt    
2024-11-30T22:18:59.532562Z    21 Execute   UPDATE QueueJobsPending
            SET jobKey = 'SendCustomerAppointmentRescheduledEmail',jobState = 'Processing',queueName = 'Medium',params = '{\"orderId\":\"jNpAhJ9oOLqJkjv\"}',tags = '{\"orderId\":\"jNpAhJ9oOLqJkjv\",\"companyId\":\"87ZIKFuNgI14YSW\"}',maxExecutionTime = '30',maxReleases = '0',releasesCounter = '0',maxRetries = '3',retriesCounter = '0',executeAfterTimestamp = '1733005138',createdAt = '2024-11-30 22:18:58',updatedAt = '2024-11-30 22:18:59'
            WHERE id = 'fZCR4WbvNwxAb1Y'
2024-11-30T22:18:59.532801Z    17 Query COMMIT
2024-11-30T22:18:59.533188Z    15 Prepare   UPDATE QueueJobsPending
            SET jobKey = ?,jobState = ?,queueName = ?,params = ?,tags = ?,maxExecutionTime = ?,maxReleases = ?,releasesCounter = ?,maxRetries = ?,retriesCounter = ?,executeAfterTimestamp = ?,createdAt = ?,updatedAt = ?
            WHERE id = ?
2024-11-30T22:18:59.533412Z    15 Execute   UPDATE QueueJobsPending
            SET jobKey = 'SendCompanyUserAppointmentRescheduledEmail',jobState = 'Processing',queueName = 'Medium',params = '{\"orderId\":\"jNpAhJ9oOLqJkjv\"}',tags = '{\"orderId\":\"jNpAhJ9oOLqJkjv\",\"companyId\":\"87ZIKFuNgI14YSW\"}',maxExecutionTime = '30',maxReleases = '0',releasesCounter = '0',maxRetries = '3',retriesCounter = '0',executeAfterTimestamp = '1733005138',createdAt = '2024-11-30 22:18:58',updatedAt = '2024-11-30 22:18:59'
            WHERE id = 'K4cr7SCAaIQDsxl'
2024-11-30T22:18:59.558046Z    21 Close stmt    
2024-11-30T22:18:59.558112Z    21 Query COMMIT
2024-11-30T22:18:59.562778Z    15 Close stmt    
2024-11-30T22:18:59.562845Z    15 Query ROLLBACK
2024-11-30T22:18:59.564264Z    15 Quit  
2024-11-30T22:18:59.565126Z    14 Query START TRANSACTION
2024-11-30T22:18:59.565645Z    14 Prepare   SELECT QueueJobsPending.id AS QueueJobsPending_id,QueueJobsPending.jobKey AS QueueJobsPending_jobKey,QueueJobsPending.jobState AS QueueJobsPending_jobState,QueueJobsPending.queueName AS QueueJobsPending_queueName,QueueJobsPending.params AS QueueJobsPending_params,QueueJobsPending.tags AS QueueJobsPending_tags,QueueJobsPending.maxExecutionTime AS QueueJobsPending_maxExecutionTime,QueueJobsPending.maxReleases AS QueueJobsPending_maxReleases,QueueJobsPending.releasesCounter AS QueueJobsPending_releasesCounter,QueueJobsPending.maxRetries AS QueueJobsPending_maxRetries,QueueJobsPending.retriesCounter AS QueueJobsPending_retriesCounter,QueueJobsPending.executeAfterTimestamp AS QueueJobsPending_executeAfterTimestamp,QueueJobsPending.createdAt AS QueueJobsPending_createdAt,QueueJobsPending.updatedAt AS QueueJobsPending_updatedAt
                FROM QueueJobsPending
                WHERE
                    jobState = 'Pending'
                    AND executeAfterTimestamp <= '1733005139'
                    AND queueName IN ('low','high','medium')
                ORDER BY FIELD(queueName, 'low','high','medium'), sequence ASC
                LIMIT 1
                FOR UPDATE SKIP LOCKED
2024-11-30T22:18:59.565774Z    14 Execute   SELECT QueueJobsPending.id AS QueueJobsPending_id,QueueJobsPending.jobKey AS QueueJobsPending_jobKey,QueueJobsPending.jobState AS QueueJobsPending_jobState,QueueJobsPending.queueName AS QueueJobsPending_queueName,QueueJobsPending.params AS QueueJobsPending_params,QueueJobsPending.tags AS QueueJobsPending_tags,QueueJobsPending.maxExecutionTime AS QueueJobsPending_maxExecutionTime,QueueJobsPending.maxReleases AS QueueJobsPending_maxReleases,QueueJobsPending.releasesCounter AS QueueJobsPending_releasesCounter,QueueJobsPending.maxRetries AS QueueJobsPending_maxRetries,QueueJobsPending.retriesCounter AS QueueJobsPending_retriesCounter,QueueJobsPending.executeAfterTimestamp AS QueueJobsPending_executeAfterTimestamp,QueueJobsPending.createdAt AS QueueJobsPending_createdAt,QueueJobsPending.updatedAt AS QueueJobsPending_updatedAt
                FROM QueueJobsPending
                WHERE
                    jobState = 'Pending'
                    AND executeAfterTimestamp <= '1733005139'
                    AND queueName IN ('low','high','medium')
                ORDER BY FIELD(queueName, 'low','high','medium'), sequence ASC
                LIMIT 1
                FOR UPDATE SKIP LOCKED
2024-11-30T22:18:59.566372Z    14 Close stmt    
2024-11-30T22:18:59.566939Z    14 Prepare   UPDATE QueueJobsPending
            SET jobKey = ?,jobState = ?,queueName = ?,params = ?,tags = ?,maxExecutionTime = ?,maxReleases = ?,releasesCounter = ?,maxRetries = ?,retriesCounter = ?,executeAfterTimestamp = ?,createdAt = ?,updatedAt = ?
            WHERE id = ?
2024-11-30T22:18:59.567002Z    14 Execute   UPDATE QueueJobsPending
            SET jobKey = 'SendCompanyUserAppointmentRescheduledEmail',jobState = 'Processing',queueName = 'Medium',params = '{\"orderId\":\"jNpAhJ9oOLqJkjv\"}',tags = '{\"orderId\":\"jNpAhJ9oOLqJkjv\",\"companyId\":\"87ZIKFuNgI14YSW\"}',maxExecutionTime = '30',maxReleases = '0',releasesCounter = '0',maxRetries = '3',retriesCounter = '0',executeAfterTimestamp = '1733005138',createdAt = '2024-11-30 22:18:58',updatedAt = '2024-11-30 22:18:59'
            WHERE id = 'K4cr7SCAaIQDsxl'
2024-11-30T22:18:59.567160Z    16 Query START TRANSACTION
2024-11-30T22:18:59.567560Z    14 Close stmt    
2024-11-30T22:18:59.567635Z    14 Query COMMIT
2024-11-30T22:18:59.568050Z    16 Prepare   SELECT QueueJobsPending.id AS QueueJobsPending_id,QueueJobsPending.jobKey AS QueueJobsPending_jobKey,QueueJobsPending.jobState AS QueueJobsPending_jobState,QueueJobsPending.queueName AS QueueJobsPending_queueName,QueueJobsPending.params AS QueueJobsPending_params,QueueJobsPending.tags AS QueueJobsPending_tags,QueueJobsPending.maxExecutionTime AS QueueJobsPending_maxExecutionTime,QueueJobsPending.maxReleases AS QueueJobsPending_maxReleases,QueueJobsPending.releasesCounter AS QueueJobsPending_releasesCounter,QueueJobsPending.maxRetries AS QueueJobsPending_maxRetries,QueueJobsPending.retriesCounter AS QueueJobsPending_retriesCounter,QueueJobsPending.executeAfterTimestamp AS QueueJobsPending_executeAfterTimestamp,QueueJobsPending.createdAt AS QueueJobsPending_createdAt,QueueJobsPending.updatedAt AS QueueJobsPending_updatedAt
                FROM QueueJobsPending
                WHERE
                    jobState = 'Pending'
                    AND executeAfterTimestamp <= '1733005139'
                    AND queueName IN ('low','high','medium')
                ORDER BY FIELD(queueName, 'low','high','medium'), sequence ASC
                LIMIT 1
                FOR UPDATE SKIP LOCKED
2024-11-30T22:18:59.568133Z    16 Execute   SELECT QueueJobsPending.id AS QueueJobsPending_id,QueueJobsPending.jobKey AS QueueJobsPending_jobKey,QueueJobsPending.jobState AS QueueJobsPending_jobState,QueueJobsPending.queueName AS QueueJobsPending_queueName,QueueJobsPending.params AS QueueJobsPending_params,QueueJobsPending.tags AS QueueJobsPending_tags,QueueJobsPending.maxExecutionTime AS QueueJobsPending_maxExecutionTime,QueueJobsPending.maxReleases AS QueueJobsPending_maxReleases,QueueJobsPending.releasesCounter AS QueueJobsPending_releasesCounter,QueueJobsPending.maxRetries AS QueueJobsPending_maxRetries,QueueJobsPending.retriesCounter AS QueueJobsPending_retriesCounter,QueueJobsPending.executeAfterTimestamp AS QueueJobsPending_executeAfterTimestamp,QueueJobsPending.createdAt AS QueueJobsPending_createdAt,QueueJobsPending.updatedAt AS QueueJobsPending_updatedAt
                FROM QueueJobsPending
                WHERE
                    jobState = 'Pending'
                    AND executeAfterTimestamp <= '1733005139'
                    AND queueName IN ('low','high','medium')
                ORDER BY FIELD(queueName, 'low','high','medium'), sequence ASC
                LIMIT 1
                FOR UPDATE SKIP LOCKED
2024-11-30T22:18:59.568530Z    16 Close stmt    
2024-11-30T22:18:59.568789Z    16 Query COMMIT
...

对死锁问题本身进行更多研究后,我发现this stackoverflow thread,第一个答案似乎表明当使用两个不同的索引锁定同一行时存在潜在问题,这可能是我的情况的原因?据我了解,这就像两个连接锁定同一行但使用不同的索引,这就是导致死锁的原因。

这些是我在两个查询中运行解释时得到的输出

select query explain update query explain

我试图提供尽可能多的细节(但也许我做得太过分了😅)

无论如何,非常感谢任何帮助,谢谢! 🤞

更新1

我尝试从终端运行相同的查询,看看两个会话是否会从

QueueJobsPending
检索同一行,但事实并非如此,SKIP LOCKED 按预期工作,下面是屏幕截图(我先运行右侧)。

running skip locked from the terminal

更新2

这可能与我的选择查询并发运行时放置的锁有关吗?

我知道这是两个非常不同的示例,但我的锁看起来与本页中的锁有很大不同https://dev.mysql.com/blog-archive/mysql-8-0-1-using-skip -锁定且不等待处理热行/

上面页面中的所有锁都是针对主 id 列放置的,在我的例子中,它们都是针对 index_jobState 列放置的。有一个是针对主键的,但是锁的类型很奇怪

locks after two sessions running in separate terminals

php mysql pdo innodb database-deadlocks
1个回答
0
投票
  1. 用这些

    替换索引
    index_jobState

     INDEX(jobState, executeAfterTimestamp, queueName, id)
     INDEX(jobState, queueName, executeAfterTimestamp, id)
    
  2. 详细说明这是什么以及如何设置:

     id CHAR(15) BINARY UNIQUE NOT NULL, PRIMARY KEY (id)
    

这可能是问题的一部分,特别是如果它是某种随机 UUID。

  1. 尝试这两种配方:

     BEGIN:
         SELECT ...; -- without the SKIP LOCKED
         UPDATE ...;
     COMMIT; 
    

    或者,通过执行

    跳过 SELECT
     UPDATE QueueJobsPending
         SET    jobState = 'Processing',
                createdAt = UNIX_TIMESTAMP(),
                updatedAt = UNIX_TIMESTAMP(),
                [[ any other columns that need changing ]]
         WHERE  jobState = 'Pending'
           AND  executeAfterTimestamp <= 123456789
           AND  queueName IN ('High','Medium','Low')
         ORDER BY  FIELD(queueName, 'High','Medium','Low'), sequence ASC
         LIMIT  1;
    

    (并保留

    autocommit=1
    为此
    UPDATE
    。)

© www.soinside.com 2019 - 2024. All rights reserved.