在 Haskell 中的并行和并发编程中,Simon Marlow 基于以下数据以及一些生产者和消费者提供了
Stream a
:
data IList a
= Nil
| Cons a (IVar (IList a))
type Stream a = IVar (IList a)
streamFromList :: NFData a => [a] -> Par (Stream a)
streamFromList xs = do
var <- new
fork $ loop xs var
return var
where
loop [] var = put var Nil
loop (x:xs) var = do
tail <- new
put var (Cons x tail)
loop xs tail
后来,他提到了这种方法的缺点并提出了解决方案:
在我们之前的例子中,消费者比生产者更快。相反,如果生产者比消费者更快,那么就没有什么可以阻止生产者远远领先于消费者并在内存中建立一条长的 IList 链。这是不可取的,因为大型堆数据结构会因垃圾收集而产生开销,因此我们可能希望对生产者进行速率限制,以避免其超前。有一个技巧可以为流 API 添加一些自动速率限制。它需要向
类型添加另一个构造函数:IList
data IList a = Nil | Cons a (IVar (IList a)) | Fork (Par ()) (IList a)
然而,他并没有完成这个方法:
我将把这个想法的其余实现作为练习,让您自己尝试。看看是否可以修改
、streamFromList
和streamFold
以合并streamMap
构造函数。块大小和分叉距离应该是生产者的参数(Fork
和streamFromList
)。streamMap
同样的问题已在邮件列表中被问过,但没有人给出答案。
那么如何限制生产者的速率呢?
重要的部分在于
loop
函数:
loop [] var = put var Nil
loop (x:xs) var = do
tail <- new
put var (Cons x tail)
loop xs tail
我们需要添加分叉距离
f
和块大小c
作为参数:
loop _ _ [] var = put var Nil
loop 0 c (x:xs) var = -- see below
loop f c (x:xs) var = do
tail <- new
put var (Cons x tail)
loop (f-1) c xs tail
叉子距离在每次迭代中都会减少。当货叉距离为零时我们需要做什么?我们提供一个
Fork op t
,其中 op
继续生成列表:
loop 0 c (x:xs) var = do
tail <- new
let op = loop c xs tail
put var (Fork op (Cons x tail))
请注意,如果列表为空,我们不会使用
Fork
。这是可能的,但有点愚蠢,毕竟已经没有什么可生产的了。更改 streamFromList
现在很简单:
streamFromList :: NFData a => Int -> Int -> [a] -> Par (Stream a)
streamFromList f c xs = do
var <- new
fork $ loop f c xs var
return var
现在,为了使用它,我们需要更改
case
中的 streamFold
:
streamFold :: (a -> b -> a) -> a -> Stream b -> Par a
streamFold fn acc instrm = acc `seq` do
ilst <- get instrm
case ilst of
Cons h t -> streamFold fn (fn acc h) t
Fork p (Cons h t) -> -- see below
_ -> return acc
请记住,我们的
Fork
中的 streamFromList
中不允许有空列表,但以防万一我们通过通配符匹配它(和 Nil
)。
如果遇到有数据的
Fork
我们该怎么办?首先,我们需要使用fork
来运行Par ()
操作,以便传播t
,然后我们就可以开始使用它了。所以我们的最后一个案例是
Fork p (Cons h t) -> fork p >> streamFold fn (fn acc h) t
streamMap
类似。仅在这种情况下,您才可以再次在循环中使用其他参数,如 streamFromList
。
我认为以下是有效的实现。
{-# LANGUAGE BangPatterns #-}
import Control.Monad.Par (IVar, Par, fork, get, new, put, put_, runPar)
import Control.DeepSeq (NFData, rnf)
data IList a
= Nil
| Cons a (IVar (IList a))
| Fork (Par ()) (IVar (IList a))
instance NFData a => NFData (IList a) where
rnf Nil = ()
rnf (Cons a b) = rnf a `seq` rnf b
rnf (Fork a b) = rnf (runPar a) `seq` rnf b
type Stream a = IVar (IList a)
main :: IO ()
main = print $ sum (pipeline [1 .. 10000])
pipeline :: [Int] -> [Int]
pipeline list = runPar $ do
strm <- streamFromList list 100 200
xs <- streamFold (\x y -> (y : x)) [] strm
return (reverse xs)
streamFromList :: NFData a => [a] -> Int -> Int -> Par (Stream a)
streamFromList xs k n = do
var <- new
fork $ loop xs var k
return var
where
loop [] var _ = put var Nil
loop xs var 0 = do
var' <- new
put_ var (Fork (loop xs var' n) var')
loop (x:xs) var i = do
tail <- new
put var (Cons x tail)
loop xs tail (i - 1)
streamFold :: (a -> b -> a) -> a -> Stream b -> Par a
streamFold fn !acc strm = do
ilst <- get strm
case ilst of
Nil -> return acc
Cons h t -> streamFold fn (fn acc h) t
Fork p s -> fork p >> streamFold fn acc s
在这里,
streamFromList
(生产者)对流赋值,而streamFold
并行消耗它们。在第一个 k
值之后,streamFromList
将 Fork
放入流中。该 Fork
包含生成下一个 n
值的计算,以及可以使用这些值的流。
此时,如果消费者落后于生产者,消费者就有机会赶上。到达
Fork
后,它 fork
就是包含的生产者。同样,生产者和消费者都可以并行进行,直到生产者在另一个 n
值之后,将另一个 Fork
添加到流中,然后重复循环。
在此实现中,分叉被放置在生成列表的中间。
import Control.DeepSeq
import Control.Monad.Par
data IList a
= Nil -- need to be NFData
| Cons a (IVar (IList a))
| Fork (Par ()) (IList a)
instance NFData a => NFData (IList a) where
rnf Nil = ()
rnf (Cons x xs) = rnf x `seq` rnf xs
rnf (Fork c l) = rnf l
type Stream a = IVar (IList a)
-- >>> runPar $ (streamFromList 3 [1 .. 10]) >>= (streamFold (+) 0)
-- 55
streamFromList :: NFData a => Int -> [a] -> Par (Stream a)
streamFromList chunkSize xs = do
dt <- new
dl <- new
put dl xs
fork $ next chunkSize dt dl
return dt
where
next :: NFData a => Int -> Stream a -> IVar [a] -> Par ()
next 1 dt dl = do
ilist <- get dl
case ilist of
[] -> put dt Nil
(x:xs) -> do
delaytail <- new
delaylist <- new
put delaylist xs
put dt (Fork (next 1 delaytail delaylist) (Cons x delaytail))
next chunkSize dt dl = do
ilist <- get dl
case ilist of
[] -> put dt Nil
(x : xs) -> do
delaytail <- new
delaylist <- new
tail <- new
put
dt
( Fork
(next chunkSize delaytail delaylist)
(Cons x tail)
)
loop xs tail delaytail delaylist (chunkSize - 2)
loop :: NFData a => [a] -> Stream a -> Stream a -> IVar [a] -> Int -> Par ()
loop [] var _ dl _ = do
put var Nil
put dl []
loop (x : xs) var dt dl count =
if count /= 0
then do
tail <- new
put var (Cons x tail)
loop xs tail dt dl (count - 1)
else do
put var (Cons x dt)
put dl xs
streamFold :: (a -> b -> a) -> a -> Stream b -> Par a
streamFold fn acc instrm = do
ilist <- get instrm
case ilist of
Nil -> return acc
Cons h t -> streamFold fn (fn acc h) t
Fork p Nil -> return acc
Fork p (Cons h t) -> do
fork p
streamFold fn (fn acc h) t
-- >>> runPar $ (streamFromList 3 [1 .. 10]) >>= (streamMap (*2)) >>= (streamFold (+) 0)
-- 110
streamMap :: (NFData a, NFData b) => (a -> b) -> Stream a -> Par (Stream b)
streamMap fn instrm = do
outstrm <- new
fork $ init fn instrm outstrm
return outstrm
where
init :: (NFData a, NFData b) => (a -> b) -> Stream a -> Stream b -> Par ()
init fn instrm outstrm = do
ilst <- get instrm
case ilst of
Nil -> put outstrm Nil
Cons h t -> do
newtl <- new
put outstrm (Cons (fn h) newtl)
init fn t newtl
Fork p Nil -> put outstrm Nil
Fork p (Cons h t) -> do
fork p
slist <- get t
case slist of
Nil -> do
newtl <- new
put newtl Nil
put outstrm (Cons (fn h) newtl)
Cons h1 t1 -> do
newtl <- new
delaytail <- new
delaystrm <- new
put outstrm (Fork (init fn delaystrm delaytail) (Cons (fn h) newtl))
loopCons fn h1 t1 newtl delaytail delaystrm
Fork p1 Nil -> do
delaytail <- new
put outstrm (Fork (put delaytail Nil) (Cons (fn h) delaytail))
Fork p1 (Cons h1 t1) -> do
delaytail <- new
delaystrm <- new
put outstrm (Fork (init fn delaystrm delaytail) (Cons (fn h) delaytail))
loopCons :: (NFData a, NFData b) => (a -> b) -> a -> Stream a -> Stream b -> Stream b -> Stream a -> Par ()
loopCons fn h t var dl ds = do
tlist <- get t
case tlist of
Nil -> do
newtl <- new
put newtl Nil
put var (Cons (fn h) newtl)
put ds Nil
Cons h1 t1 -> do
newtl <- new
put var (Cons (fn h) newtl)
loopCons fn h1 t1 newtl dl ds
Fork p Nil -> do
newtl <- new
put newtl Nil
put var (Cons (fn h) newtl)
Fork p (Cons h1 t1) -> do
put ds tlist
put var (Cons (fn h) dl)