我正在使用 django ORM 在 RabbitMQ 消费者的回调函数内与 MySQL 数据库进行通信。这些消费者在单独的线程上运行,每个消费者都与其队列建立了自己的连接。
这是我的两个消费者回调的代码:
TasksExecutorService
# imports
from pika.spec import Basic
from pika.channel import Channel
from pika import BasicProperties
import uuid
from jobs.models import Task
from exceptions import MasterConsumerServiceError as ServiceError
from .master_service import MasterConsumerSerivce
class TaskExecutorService(MasterConsumerSerivce):
queue = 'master_tasks'
@classmethod
def callback(cls, ch: Channel, method: Basic.Deliver, properties: BasicProperties, message: dict):
# get task
task_id_str = message.get('task_id')
task_id = uuid.UUID(task_id_str)
task_qs = Task.objects.filter(pk=task_id)
if not task_qs.exists():
raise ServiceError(message=f'Task {task_id_str} does not exist')
task = task_qs.first()
# check if task is stopped
if task.status == cls.Status.TASK_STOPPED:
raise ServiceError(message=f'Task {task_id_str} is stopped')
# send task to results queue
publisher = cls.get_publisher(queue=cls.Queues.results_queue)
published, error = publisher.publish(message=message | {'status': True, 'error': None})
if not published:
raise ServiceError(message=str(error))
# update task status
task.status = cls.Status.TASK_PROCESSING
task.save()
return
结果处理服务
# imports
from pika.spec import Basic
from pika.channel import Channel
from pika import BasicProperties
import uuid
from jobs.models import Task
from exceptions import MasterConsumerServiceError as ServiceError
from .master_service import MasterConsumerSerivce
class ResultHandlerService(MasterConsumerSerivce):
queue = 'master_results'
@classmethod
def callback(cls, ch: Channel, method: Basic.Deliver, properties: BasicProperties, message: dict):
# get task
task_id_str = message.get('task_id')
task_id = uuid.UUID(task_id_str)
task_qs = Task.objects.filter(pk=task_id)
if not task_qs.exists():
raise ServiceError(message=f'Task {task_id_str} does not exist')
task = task_qs.first()
# get result data and status
data = message.get('data')
status = message.get('status')
# if task is not successful
if not status:
# fail task
task.status = cls.Status.TASK_FAILED
task.save()
# fail job
task.job.status = cls.Status.JOB_FAILED
task.job.save()
return
# update task status
task.status = cls.Status.TASK_DONE
task.save()
# check if job is complete
task_execution_order = task.process.execution_order
next_task_qs = Task.objects.select_related('process').filter(job=task.job, process__execution_order=task_execution_order + 1)
is_job_complete = not next_task_qs.exists()
# check job is complete
if is_job_complete:
# publish reults
publisher = cls.get_publisher(queue=cls.Queues.output_queue)
published, error = publisher.publish(message={'job_id': str(task.job.id), 'data': data})
if not published:
raise ServiceError(message=str(error))
# update job status
task.job.status = cls.Status.JOB_DONE
task.job.save()
# otherwise
else:
# publish next task
next_task = next_task_qs.first()
publisher = cls.get_publisher(queue=cls.Queues.tasks_queue)
published, error = publisher.publish(message={'task_id': str(next_task.id), 'data': data})
if not published:
raise ServiceError(message=str(error))
# update next task status
next_task.status = cls.Status.TASK_QUEUED
next_task.save()
return
问题是无论我在哪里使用:
task.status = cls.Status.TASK_ABC
task.save()
由此产生的行为非常不稳定。有时一切正常,所有状态都按预期更新,但大多数情况下,即使流程按预期完成并且我的输出队列填充了结果,状态也永远不会更新。如果我在执行
task.save()
后记录任务状态,记录的状态也是我期望看到的,但数据库内的值永远不会更新。
如果需要,我很乐意提供更多代码。 请帮我解决这个问题。
这称为“竞争条件”。您有两个不同的线程同时修改同一个对象。
在某个时间点,它们必然都会有过时的数据。
即,当线程B更改并保存对象时,线程A的数据就变得陈旧了。此时如果线程 A 保存该对象,那么它会将陈旧数据(旧值)保存到数据库中。
发生这种情况是因为当数据库中的数据发生更改时,Django 不会自动更新模型实例对象。
竞争条件的解决方案是使用锁。
现在,我不太了解代码中发生的所有事情,所以我无法给出代码示例。但是您可以使用
select_for_update
和 原子交易来解决您的问题。