我的生产者-消费者实现超出了最大大小

问题描述 投票:0回答:1

我已经实现了一个 Runnable Producer 和一个 Runnable Consumer 以及一个 Store 类。商店分配了最大尺寸的货架。每个生产者添加一个项目,每个消费者将删除一个项目。我使用 CachedThreadPool 创建两种类型的线程,其中每个线程都在无限循环上运行。我还为 Runnable 类实现了两个单独的信号量,生产者的初始许可大小等于 Store 的最大货架大小,而消费者的初始许可设置为零。

当我运行代码时,我收到了 IndexOutOfBoundsException 的运行时错误,我知道问题在于“this.items.size()-1”行,其中两个消费者读取相同的索引并尝试删除它们,从而导致上述情况错误。但我还收到了另一个意想不到的结果,即我的列表中包含的物品数量超过了货架大小。例如,在下面的代码中,我将初始 maxShelfs 大小设置为 5,并为 Producer 提供了许可大小 5。但是我不断获得输出“Consumer size: 6”,在此之后,我尝试在 addItem 方法中放置一个断点列表大小超过 maxShelf 大小。多次运行证实列表大小确实达到了 6,而且这种情况很少发生。我不明白为什么在实际许可证有限的情况下列表大小是 6?

public class main {
    public static void main(String[] args) {
        Semaphore ps = new Semaphore(5);
        Semaphore cs = new Semaphore(0);
        Store s = new Store(5,ps,cs);
        ExecutorService es = Executors.newCachedThreadPool();
        for(int i =0;i<8;i++)
        {
            es.execute(new Producer(s,ps,cs));
        }
        for(int i =0;i<20;i++)
        {
            es.execute(new Consumer(s,ps,cs));
        }
    }
}


public class Store {
    private List<Object> items;
    public int maxShelfs;
    Semaphore ps;
    Semaphore cs;

    public Store (int maxShelfs,Semaphore ps, Semaphore cs)
    {
        this.ps = ps;
        this.cs = cs;
        this.maxShelfs = maxShelfs;
        items = new ArrayList<>();
    }

    public int getMaxShelfs()
    {
        return maxShelfs;
    }

    public List<Object> getItems()
    {
        return items;
    }

    public void addItem()
    {

        System.out.println("Producer size: "+this.items.size());
        this.items.add(new Object());
        if(items.size()==6) {
            System.out.println("test");
        }
    }

    public void removeItem()
    {

        System.out.println("Consumer size: "+this.items.size());
        this.items.remove(this.items.size()-1);
    }
}


public class Consumer implements Runnable{
    private Store s;
    Semaphore ps;
    Semaphore cs;

    public Consumer(Store s, Semaphore ps, Semaphore cs)
    {
        this.ps = ps;
        this.cs = cs;
        this.s = s;
    }

    public void run()
    {
        while(true)
        {
            try
            {
                cs.acquire();
//                Thread.sleep(20);
            }
            catch (InterruptedException e)
            {
                throw new RuntimeException(e);
            }
            if (s.getItems().size() > 0)
            {
                s.removeItem();
            }
            ps.release();
        }
    }
}


public class Producer implements Runnable{
    private Store s;
    Semaphore ps;
    Semaphore cs;

    public Producer (Store s,Semaphore ps,Semaphore cs)
    {
        this.ps = ps;
        this.cs = cs;
        this.s = s;
    }

    public void run()
    {
        while(true)
        {
            try
            {
                ps.acquire();
//                Thread.sleep(20);
                if (s.getItems().size() < s.getMaxShelfs()) {
                    s.addItem();
                }
                cs.release();

            }
            catch (InterruptedException e)
            {
                throw new RuntimeException(e);
            }



        }
    }
}

我正在尝试学习并发性,并且还没有了解有关 try 和 catch 的更多信息,因此我的最初观点是查看一种罕见的情况,其中由于运行方法期间的误读而导致条件中的列表大小读取重叠可能会导致这种情况,但我没有能够提出这样的测试用例(我被告知 List 不是线程安全的,我应该使用并发数据结构)。

列表大小为6的情况

java concurrency producer-consumer
1个回答
0
投票

您的商店不是线程安全的。由于生产者和消费者都在修改存储,因此他们无法并行操作。您需要使用互斥体序列化添加和删除操作。如果要使用信号量,只需要一个只有一个线程经过的实例:

Semaphore semaphore = new Semaphore(1);

添加元素时需要用到这个信号量:

try {
    semaphore.acquire();
    if (s.getItems().size() < s.getMaxShelfs()) {
        s.addItem();
    }
} finally {
    semaphore.release();
}

删除元素时必须使用相同的信号量:

try {
    semaphore.acquire();
    if (s.getItems().size() > 0) {
        s.removeItem();
    }
} finally {
    semaphore.release();
}
© www.soinside.com 2019 - 2024. All rights reserved.