实现线程安全双端队列

问题描述 投票:0回答:1

作为在 MacOS 上运行的服务器应用程序的一部分,我需要实现一个线程安全的

deque
以便将作业提供给线程池。我是 Swift 新手。

在 C++ 中它看起来像:

class JobQueue
{
public:
    size_t size()
    {
        mGuard.lock(mMutex);
        return mJobFifo.size()
    }
    
    void push(const Job &job)
    {
        mGuard.lock(mMutex);
        mJobFifo.push_back(job);
    }

    Job pop()
    {
        mGuard.lock(mMutex);
        Job tmp = mJobFifo.front();
        mJobFifo.pop_front();
        return tmp;
    }   
    
private:
    std::deqeue<Job>            mJobFifo;
    std::lock_guard<std::mutex> mGuard;
    std::mutex                  mMutex;
}

在 Swift 中,我尝试了下面的代码,但是

size()
总是返回零,尽管支持
Deque
有很多元素。

我读过一些文章说使用

mQueue.async(flags: .barrier)
来锁定独占访问(如
std::lock_guard
??),但函数的返回值未更新。

我还尝试将

return ret
放入 .async 中,但它给出了错误“无法将类型 'Int' 的值转换为闭包结果类型 'Void'”:

    mQueue.async(flags: .barrier)
    {
        ret = self.mJobFifo.count
        print("internal size: \(ret)")
        return ret
    }

这是实现这样一个线程安全队列的最佳方式吗?如果是这样,那我做错了什么?如果不是的话应该怎么做呢?

import Foundation
import DequeModule
class OcrJob
{
    static var lastJobId : Int = 0
    var jobId : Int
    
    init()
    {
        jobId = OcrJob.lastJobId
        OcrJob.lastJobId += 1
    }
}

class OcrJobQueue
{
    init(max: UInt)
    { 
        mMaxSize = max
    }
    
    public func size() -> Int
    {
        var ret = 0
        mQueue.async(flags: .barrier)
        {
            ret = self.mJobFifo.count
            print("internal size: \(ret)")
        }
        
        print("size returns \(ret). ")
        return ret
    }
    
    public func pushBack(job: OcrJob) throws
    {
        let curSize = mJobFifo.count
        guard (curSize < mMaxSize) else
        {
            print("pushBack - curSize: \(curSize), mMaxSize \(mMaxSize)")
            throw QueueError.full
        }
        
        mQueue.async(flags: .barrier)
        {
            self.mJobFifo.append(job)
            print("pushBack: \(self.mJobFifo.count) jobs in queue")
        }
    }
    
    public func popFirst() -> OcrJob?
    {
        var ret : OcrJob? = nil
        mQueue.async(flags: .barrier)
        {
            ret = self.mJobFifo.popFirst()
            print("popFirst: \(self.mJobFifo.count) jobs in queue")
        }
        
        return ret
    }
    
    private var mMaxSize : UInt = 10
    private var mQueue = DispatchQueue(label: "JobQueue", attributes: .concurrent)
    private var mJobFifo : Deque<OcrJob> = []
    
}
swift multithreading macos
1个回答
0
投票

解决这个问题最简单的方法可能是使用 Swift Actor。 参与者隔离对其可变状态的访问。

我还会将

OcrJob
更改为结构体,使其成为值类型,并消除静态属性,因为这是需要关注的更可变的状态。

struct OcrJob {

   let jobId = UUID()

}

actor OcrJobQueue {

    let maxSize: UInt
    private var jobFifo = [OcrJob]()

    init(max: UInt = 10)
    { 
        maxSize = max
    }

    public var size: Uint 
    {
        return self.jobFifo.count
    }
    
    public func pushBack(job: OcrJob) throws
    {
        guard (jobFifo.count < mMaxSize) else
        {
            throw QueueError.full
        }
        jobFifo.append(job)
    }
    
    public func popFirst() -> OcrJob?
    {
        guard !self.jobFifo.isEmpty else {
            return nil
        }
        return self.jobFifo.removeFirst()
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.