多线程中的生产者-消费者队列,关闭机制

问题描述 投票:0回答:1

我写了下面的伪代码。它是关于多线程中经典的生产者-消费者队列,但我正在尝试添加一个优雅的关闭机制。我希望 ConsumerItem 继续工作,直到队列中的所有项目都被消耗为止。这是一个好的实施吗?或者我是否错过了可能导致死锁或陈旧线程的场景?谢谢你。

thread addItems() {
while (!shutdown) {
    item = ItemFactory.produceItem();
    lock.acquire();
    try {
        while (self.queue.length() == MAX_QUEUE_SIZE) {
            conditionHasSpace.wait()
        }
        if (!shutdown) {
            self.queue.insert(order);
        }
        conditionHasItems.notify();
    }
    catch {
        handleException();
    }
    finally {
        lock.release()
    }
}

}

thread consumeItems() {
while (true) {
    lock.acquire();
    try { 
        while (self.queue.length() == 0) {
            if (shutdown) {
                exit;
            }
            conditionHasItems.wait();
        }
        self.queue.pop();
        conditionHasSpace.notify();
    }
    catch {
        handleException();
    }
    finally {
        lock.release();
    }
}

}

multithreading algorithm queue pseudocode
1个回答
0
投票

这里是使用Ada编程语言的解决方案。 Ada 提供任务作为主动并发单元(通常映射到操作系统线程),并将受保护对象作为被动并发单元。本示例使用受保护的对象来实现生产者和消费者共享的缓冲区。

受保护对象、生产者任务和消费者任务在名为 Producer_and_consumers 的包中声明和定义。该包有两部分。第一部分是包规范,提供包的API。第二部分是包体,包含包逻辑的实现。

包装规格非常简单。它只是声明两个任务的名称。

package producers_and_consumers is
   task producer;
   task consumer;
end producers_and_consumers;

有趣的部分是在包体中定义了受保护的缓冲区、生产者任务和消费者任务。

with Ada.Text_IO; use Ada.Text_IO;

package body producers_and_consumers is
   -- Protected object acting as the buffer
   -- for the producer and consumer tasks
   ------------------------------------------

   ------------
   -- buffer --
   ------------

   type Idx_type is mod 10;
   type circular_array is array (Idx_Type) of Integer;

   protected buffer is
      entry write (Item : in Integer);
      entry read (Item : out Integer);
      procedure set_done;
      function is_done return Boolean;
      function Buf_Empty return Boolean;
   private
      Buf           : circular_array;
      Write_Idx     : Idx_type := 0;
      Read_Idx      : Idx_type := 0;
      Producer_Done : Boolean  := False;
      Count         : Natural  := 0;
   end buffer;

   protected body buffer is
      entry write (Item : in Integer) when Count < Idx_type'Modulus is
      begin
         Buf (Write_Idx) := Item;
         Write_Idx       := Write_Idx + 1;
         Count           := Count + 1;
      end write;

      entry read (Item : out Integer) when Count > 0 is
      begin
         Item     := Buf (Read_Idx);
         Read_Idx := Read_Idx + 1;
         Count    := Count - 1;
      end read;

      procedure set_done is
      begin
         Producer_Done := True;
      end set_done;

      function Is_Done return Boolean is
      begin
         return Producer_Done;
      end Is_Done;

      function Buf_Empty return Boolean is
      begin
         return Count = 0;
      end Buf_Empty;

   end buffer;

   --------------
   -- producer --
   --------------

   task body producer is
   begin
      for I in 1 .. 30 loop
         buffer.write (I);
      end loop;
      buffer.set_done;
   end producer;

   --------------
   -- consumer --
   --------------

   task body consumer is
      Value : Integer;
   begin
      while not buffer.is_done loop
         buffer.read (Value);
         Put_Line ("Consumer read" & Value'Image);
      end loop;
      while not buffer.Buf_Empty loop
         buffer.read (Value);
         Put_Line ("Consumer read" & Value'Image);
      end loop;
   end consumer;

end producers_and_consumers;

受保护的缓冲区也分为两部分,规范和主体。该规范在公共部分中声明用于与受保护对象交互的方法,并在私有部分中声明受保护对象的数据成员。保护对象具有三种功能。受保护的条目隐式操作独占读写锁,并且仅当其边界条件计算为 TRUE 时才执行。受保护的过程操作独占读写锁并无条件执行。受保护的函数只能从受保护的对象读取值。受保护的函数实现共享读锁。

这个受保护对象实现了两个名为 write 和 read 的条目。它实现一个名为 set_done 的过程。它实现了两个名为 is_done 和 Buf_Empty 的函数。

受保护规范的私有部分声明了 5 个数据项。

  • Buf 是一个循环数组,它将包含由写条目写入受保护对象的数据以及由读条目从受保护对象读取的数据。
  • Write_Idx 是一个变量,用于跟踪写入条目使用的索引。 Write_Idx 是包体中定义的 Idx_type 类型的实例。 Idx_type 是模块化类型,有效值范围为 0 到 9。应用于模块化类型实例的所有算术本身都是模块化的。当包含值 9 的 Idx_type 对象递增时,结果将为 0。
  • Read_Idx 是一个变量,用于跟踪读取条目使用的索引。 Read_Idx 与 Write_Idx 一样,是 Idx_type 的实例。
  • Producer_Done 是一个布尔变量,初始化为 False。
  • Count 是 Natural 子类型的一个实例,初始化为 0。 受保护体定义了所有的条目、功能和过程。

生产者任务主体定义了生产者任务的逻辑。该任务只是将数字 1 到 30 写入 Buffer protected 对象,然后调用 Buffer.set_done。

消费者任务主体定义了消费者任务的逻辑。该任务使用 while 循环来迭代 Buffer 受保护对象中的值,直到 Buffer.is_done 返回 TRUE。然后,消费者任务进入第二个循环以迭代 Buffer 受保护的对象,直到 Buffer.Buf_Empty 返回 True。

Buffer 保护对象 set_done、is_done 和 Buf_Empty 方法提供了协调生产者和消费者有序关闭的方法。

主程序,即本例中的程序入口点,简单地声明了对 Producer_and_consumers 包的依赖,该包会自动启动生产者和消费者任务。主程序只有在生产者和消费者任务都完成后才会退出。

程序的输出是:

C:\Users\jimma\Ada\Producer_Consumer\Auto_shutdown\obj\main.exe
Consumer read 1
Consumer read 2
Consumer read 3
Consumer read 4
Consumer read 5
Consumer read 6
Consumer read 7
Consumer read 8
Consumer read 9
Consumer read 10
Consumer read 11
Consumer read 12
Consumer read 13
Consumer read 14
Consumer read 15
Consumer read 16
Consumer read 17
Consumer read 18
Consumer read 19
Consumer read 20
Consumer read 21
Consumer read 22
Consumer read 23
Consumer read 24
Consumer read 25
Consumer read 26
Consumer read 27
Consumer read 28
Consumer read 29
Consumer read 30
[2024-10-04 20:07:51] process terminated successfully, elapsed time: 01.05s
© www.soinside.com 2019 - 2024. All rights reserved.