在https://z0ltan.wordpress.com/2016/09/09/basic-concurrency-and-parallelism-in-common-lisp-part-4a-parallelism-using-lparallel-fundamentals/#channels的lparallel库中对队列的基本讨论说,队列“允许在工作线程之间传递消息”。下面的测试使用共享队列来协调主线程和从属线程,其中主要在退出之前等待下级的完成:
(defun foo (q)
(sleep 1)
(lparallel.queue:pop-queue q)) ;q is now empty
(defun test ()
(setf lparallel:*kernel* (lparallel:make-kernel 1))
(let ((c (lparallel:make-channel))
(q (lparallel.queue:make-queue)))
(lparallel.queue:push-queue 0 q)
(lparallel:submit-task c #'foo q)
(loop do (sleep .2)
(print (lparallel.queue:peek-queue q))
when (lparallel.queue:queue-empty-p q)
do (return)))
(lparallel:end-kernel :wait t))
这按预期产生输出:
* (test)
0
0
0
0
NIL
(#<SB-THREAD:THREAD "lparallel" FINISHED values: NIL {10068F2B03}>)
我的问题是我是否正确或完全使用lparallel的队列功能。似乎队列只是替代使用全局变量来保存线程共享对象。使用队列的设计优势是什么?通常好的做法是为每个提交的任务分配一个队列(假设任务需要通信)?感谢您提供更深入的见解。
多线程工作是通过管理对可变共享状态的并发访问来完成的,即您可以锁定公共数据结构,并且每个线程都可以读取或写入它。
但是,建议最小化同时访问的数据的数量。队列是一种通过让每个线程管理其本地状态并仅通过消息交换数据来将工作者彼此分离的方法;这是线程安全的,因为对队列的访问由locks and condition variables控制。
你在主线程中做的是在队列为空时轮询;这可能有效但这会适得其反,因为队列被用作同步机制,但在这里你自己进行同步。
(ql:quickload :lparallel)
(defpackage :so (:use :cl
:lparallel
:lparallel.queue
:lparallel.kernel-util))
(in-package :so)
让我们改变foo
,使其获得两个队列,一个用于传入请求,一个用于回复。在这里,我们对发送的数据执行简单的转换,对于每个输入消息,只有一个输出消息,但并非总是如此。
(defun foo (in out)
(push-queue (1+ (pop-queue in)) out))
更改test
以便控制流仅基于读取/写入队列:
(defun test ()
(with-temp-kernel (1)
(let ((c (make-channel))
(foo-in (make-queue))
(foo-out (make-queue)))
(submit-task c #'foo foo-in foo-out)
;; submit data to task (could be blocking)
(push-queue 0 foo-in)
;; wait for message from task (could be blocking too)
(pop-queue foo-out))))
但是如果有多个任务在运行,你怎么能避免在测试中进行轮询?您是否需要不断检查其中任何一个是否已完成,以便您可以将更多工作排到队列中?
您可以使用不同的并发机制,类似于listen和poll/epoll,您可以在其中查看多个事件源,并在其中一个事件准备就绪时做出反应。有一些语言,如Go(select)和Erlang(receive),这是很自然的表达方式。在Lisp方面,Calispel库提供了类似的交替机制(pri-alt
和fair-alt
)。例如,以下摘自Calispel的测试代码:
(pri-alt ((? control msg)
(ecase msg
(:clean-up (setf cleanup? t))
(:high-speed (setf slow? nil))
(:low-speed (setf slow? t))))
((? channel msg)
(declare (type fixnum msg))
(vector-push-extend msg out))
((otherwise :timeout (if cleanup? 0 nil))
(! reader-results out)
(! thread-expiration (bt:current-thread))
(return)))
在lparallel的情况下,没有这样的机制,但是如果你用标识符标记你的消息,你可以只使用队列。
如果您需要在任务t1
或t2
给出结果后立即做出反应,那么请将这两个任务写入相同的结果通道:
(let ((t1 (foo :id 1 :in i1 :out res))
(t2 (bar :id 2 :in i2 :out res)))
(destructuring-bind (id message) (pop-queue res)
(case id
(1 ...)
(2 ...))))
如果你需要在t1
和t2
发出结果时同步代码,让他们写入不同的通道:
(let ((t1 (foo :id 1 :in i1 :out o1))
(t2 (bar :id 2 :in i2 :out o2)))
(list (pop-queue o1)
(pop-queue o2)))