Sidekiq队列用于指定对象

问题描述 投票:1回答:2

我用这个工人进行处理

class CreateOrUpdateContactWorker
    include Sidekiq::Worker
    sidekiq_options retry: 2, queue: 'contact_updater', concurrency: 1

    sidekiq_retries_exhausted do |msg|
        Airbrake.notify(error_message: "Contact update failed", session: { msg: msg })
    end

    def perform(user_id, changed_fields, update_address = false)
        ContactUpdater.create_or_update_contact(user_id, changed_fields, update_address: update_address)
    end
end

在用户模型中,我有after_commit回调

def update_mautic_contact
    CreateOrUpdateContactWorker.perform_async(id, previous_changes.keys, ship_address_changed || false)
end

问题是当用户同时更新两次时,因为create_or_update_contact需要一些时间。如何仅为指定用户限制线程?每个任务将逐个执行以指定user_id。

ruby-on-rails-4 sidekiq worker
2个回答
0
投票

我不知道你是否有redis作为基础设施的一部分,但你所描述的是竞争条件。要解决它,你需要互斥锁/锁到你的关键路径create_or_update_contact

这里的竞争条件发生在两个异步工作者/进程之间,所以你不能只使用简单的ruby互斥锁/锁。您需要一个使用中央锁存储/保持器的分布式互斥锁。这个:https://github.com/kenn/redis-mutex应该为你做,但你需要redis数据库。

基本上你的代码会像:

class CreateOrUpdateContactWorker
    include Sidekiq::Worker
    sidekiq_options retry: 2, queue: 'contact_updater', concurrency: 1

    sidekiq_retries_exhausted do |msg|
        Airbrake.notify(error_message: "Contact update failed", session: { msg: msg })
    end

    def perform(user_id, changed_fields, update_address = false)
        RedisMutex.with_lock("#{user_id}_create_or_update_contact") do
            ContactUpdater.create_or_update_contact(user_id, changed_fields, update_address: update_address)
        end
    end
end

因此,如果您同时为user_id = 1有2个用户更新,则第一个获取名为1_create_or_update_contact的锁/互斥锁将首先执行,并将阻止另一个调用,直到它完成,然后第二个调用将开始。

这将解决你的问题:)我认为redis是必要的,有用的和少数。如果不使用redis,我几乎无法想到任何我的铁轨项目。


0
投票

我用Redis意识到了这一点,但没有任何宝石。我在执行worker之前使用了条件:

def update_mautic_contact
    if Rails.current.get("CreateOrUpdateContactWorkerIsRunning_#{id}")
        Redis.current.set("CreateOrUpdateContactWorkerIsRunning_#{id}", true)
        CreateOrUpdateContactWorker.perform_in(1.minutes, id, changed_fields)
    else
        Redis.current.set("CreateOrUpdateContactWorkerIsRunning_#{id}", true)
        CreateOrUpdateContactWorker.perform_async(id, changed_fields)
    end
end

和内部工人:

class CreateOrUpdateContactWorker
    include Sidekiq::Worker
    sidekiq_options retry: 2, queue: 'contact_updater', concurrency: 1

    sidekiq_retries_exhausted do |msg|
        Airbrake.notify(error_message: "Contact update failed", session: { msg: msg })
    end

    def perform(user_id, changed_fields, update_address = false)
        ContactUpdater.create_or_update_contact(user_id, changed_fields, update_address: update_address)
        Redis.current.del("CreateOrUpdateContactWorkerIsRunning_#{user_id}")
    end
end
© www.soinside.com 2019 - 2024. All rights reserved.