我用这个工人进行处理
class CreateOrUpdateContactWorker
include Sidekiq::Worker
sidekiq_options retry: 2, queue: 'contact_updater', concurrency: 1
sidekiq_retries_exhausted do |msg|
Airbrake.notify(error_message: "Contact update failed", session: { msg: msg })
end
def perform(user_id, changed_fields, update_address = false)
ContactUpdater.create_or_update_contact(user_id, changed_fields, update_address: update_address)
end
end
在用户模型中,我有after_commit
回调
def update_mautic_contact
CreateOrUpdateContactWorker.perform_async(id, previous_changes.keys, ship_address_changed || false)
end
问题是当用户同时更新两次时,因为create_or_update_contact
需要一些时间。如何仅为指定用户限制线程?每个任务将逐个执行以指定user_id。
我不知道你是否有redis
作为基础设施的一部分,但你所描述的是竞争条件。要解决它,你需要互斥锁/锁到你的关键路径create_or_update_contact
。
这里的竞争条件发生在两个异步工作者/进程之间,所以你不能只使用简单的ruby互斥锁/锁。您需要一个使用中央锁存储/保持器的分布式互斥锁。这个:https://github.com/kenn/redis-mutex应该为你做,但你需要redis
数据库。
基本上你的代码会像:
class CreateOrUpdateContactWorker
include Sidekiq::Worker
sidekiq_options retry: 2, queue: 'contact_updater', concurrency: 1
sidekiq_retries_exhausted do |msg|
Airbrake.notify(error_message: "Contact update failed", session: { msg: msg })
end
def perform(user_id, changed_fields, update_address = false)
RedisMutex.with_lock("#{user_id}_create_or_update_contact") do
ContactUpdater.create_or_update_contact(user_id, changed_fields, update_address: update_address)
end
end
end
因此,如果您同时为user_id = 1有2个用户更新,则第一个获取名为1_create_or_update_contact
的锁/互斥锁将首先执行,并将阻止另一个调用,直到它完成,然后第二个调用将开始。
这将解决你的问题:)我认为redis
是必要的,有用的和少数。如果不使用redis
,我几乎无法想到任何我的铁轨项目。
我用Redis意识到了这一点,但没有任何宝石。我在执行worker之前使用了条件:
def update_mautic_contact
if Rails.current.get("CreateOrUpdateContactWorkerIsRunning_#{id}")
Redis.current.set("CreateOrUpdateContactWorkerIsRunning_#{id}", true)
CreateOrUpdateContactWorker.perform_in(1.minutes, id, changed_fields)
else
Redis.current.set("CreateOrUpdateContactWorkerIsRunning_#{id}", true)
CreateOrUpdateContactWorker.perform_async(id, changed_fields)
end
end
和内部工人:
class CreateOrUpdateContactWorker
include Sidekiq::Worker
sidekiq_options retry: 2, queue: 'contact_updater', concurrency: 1
sidekiq_retries_exhausted do |msg|
Airbrake.notify(error_message: "Contact update failed", session: { msg: msg })
end
def perform(user_id, changed_fields, update_address = false)
ContactUpdater.create_or_update_contact(user_id, changed_fields, update_address: update_address)
Redis.current.del("CreateOrUpdateContactWorkerIsRunning_#{user_id}")
end
end