Django model.save() 执行不一致的更新

问题描述 投票:0回答:1

我正在使用 django ORM 在 RabbitMQ 消费者的回调函数内与 MySQL 数据库进行通信。这些消费者在单独的线程上运行,每个消费者都与其队列建立了自己的连接。

这是我的两个消费者回调的代码:

TasksExecutorService

# imports
from pika.spec import Basic
from pika.channel import Channel
from pika import BasicProperties

import uuid

from jobs.models import Task

from exceptions import MasterConsumerServiceError as ServiceError

from .master_service import MasterConsumerSerivce


class TaskExecutorService(MasterConsumerSerivce):
  queue = 'master_tasks'

  @classmethod
  def callback(cls, ch: Channel, method: Basic.Deliver, properties: BasicProperties, message: dict):
    # get task
    task_id_str = message.get('task_id')
    task_id = uuid.UUID(task_id_str)
    task_qs = Task.objects.filter(pk=task_id)
    if not task_qs.exists():
      raise ServiceError(message=f'Task {task_id_str} does not exist')
    task = task_qs.first()

    # check if task is stopped
    if task.status == cls.Status.TASK_STOPPED:
      raise ServiceError(message=f'Task {task_id_str} is stopped')

    # send task to results queue
    publisher = cls.get_publisher(queue=cls.Queues.results_queue)
    published, error = publisher.publish(message=message | {'status': True, 'error': None})
    if not published:
      raise ServiceError(message=str(error))

    # update task status
    task.status = cls.Status.TASK_PROCESSING
    task.save()

    return

结果处理服务

# imports
from pika.spec import Basic
from pika.channel import Channel
from pika import BasicProperties

import uuid

from jobs.models import Task
from exceptions import MasterConsumerServiceError as ServiceError

from .master_service import MasterConsumerSerivce


class ResultHandlerService(MasterConsumerSerivce):
  queue = 'master_results'

  @classmethod
  def callback(cls, ch: Channel, method: Basic.Deliver, properties: BasicProperties, message: dict):
    # get task
    task_id_str = message.get('task_id')
    task_id = uuid.UUID(task_id_str)
    task_qs = Task.objects.filter(pk=task_id)
    if not task_qs.exists():
      raise ServiceError(message=f'Task {task_id_str} does not exist')
    task = task_qs.first()

    # get result data and status
    data = message.get('data')
    status = message.get('status')

    # if task is not successful
    if not status:
      # fail task
      task.status = cls.Status.TASK_FAILED
      task.save()

      # fail job
      task.job.status = cls.Status.JOB_FAILED
      task.job.save()

      return

    # update task status
    task.status = cls.Status.TASK_DONE
    task.save()

    # check if job is complete
    task_execution_order = task.process.execution_order
    next_task_qs = Task.objects.select_related('process').filter(job=task.job, process__execution_order=task_execution_order + 1)
    is_job_complete = not next_task_qs.exists()

    # check job is complete
    if is_job_complete:
      # publish reults
      publisher = cls.get_publisher(queue=cls.Queues.output_queue)
      published, error = publisher.publish(message={'job_id': str(task.job.id), 'data': data})
      if not published:
        raise ServiceError(message=str(error))

      # update job status
      task.job.status = cls.Status.JOB_DONE
      task.job.save()

    # otherwise
    else:
      # publish next task
      next_task = next_task_qs.first()
      publisher = cls.get_publisher(queue=cls.Queues.tasks_queue)
      published, error = publisher.publish(message={'task_id': str(next_task.id), 'data': data})
      if not published:
        raise ServiceError(message=str(error))

      # update next task status
      next_task.status = cls.Status.TASK_QUEUED
      next_task.save()

    return

问题是无论我在哪里使用:

task.status = cls.Status.TASK_ABC
task.save()

由此产生的行为非常不稳定。有时一切正常,所有状态都按预期更新,但大多数情况下,即使流程按预期完成并且我的输出队列填充了结果,状态也永远不会更新。如果我在执行

task.save()
后记录任务状态,记录的状态也是我期望看到的,但数据库内的值永远不会更新。

如果需要,我很乐意提供更多代码。 请帮我解决这个问题。

python django django-rest-framework rabbitmq python-multithreading
1个回答
0
投票

这称为“竞争条件”。您有两个不同的线程同时修改同一个对象。

在某个时间点,它们必然都会有过时的数据。

即,当线程B更改并保存对象时,线程A的数据就变得陈旧了。此时如果线程 A 保存该对象,那么它会将陈旧数据(旧值)保存到数据库中。

发生这种情况是因为当数据库中的数据发生更改时,Django 不会自动更新模型实例对象。

竞争条件的解决方案是使用锁。

现在,我不太了解代码中发生的所有事情,所以我无法给出代码示例。但是您可以使用

select_for_update
原子交易来解决您的问题。

© www.soinside.com 2019 - 2024. All rights reserved.