我有一个相对简单的“复制”程序,该程序仅将一个文件的所有行复制到另一个文件。我玩弄Haskell的并发支持TMQueue
和STM
所以我想我会尝试这样的:
{-# LANGUAGE BangPatterns #-}
module Main where
import Control.Applicative
import Control.Concurrent.Async -- from async
import Control.Concurrent.Chan
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TMQueue -- from stm-chans
import Control.Monad (replicateM, forM_, forever, unless)
import qualified Data.ByteString.Char8 as B
import Data.Function (fix)
import Data.Maybe (catMaybes, maybe)
import System.IO (withFile, IOMode(..), hPutStrLn, hGetLine)
import System.IO.Error (catchIOError)
input = "data.dat"
output = "out.dat"
batch = 100 :: Int
consumer :: TMQueue B.ByteString -> IO ()
consumer q = withFile output WriteMode $ \fh -> fix $ \loop -> do
!items <- catMaybes <$> replicateM batch readitem
forM_ items $ B.hPutStrLn fh
unless (length items < batch) loop
where
readitem = do
!item <- atomically $ readTMQueue q
return item
producer :: TMQueue B.ByteString -> IO ()
producer q = withFile input ReadMode $ \fh ->
(forever (B.hGetLine fh >>= atomically . writeTMQueue q))
`catchIOError` const (atomically (closeTMQueue q) >> putStrLn "Done")
main :: IO ()
main = do
q <- atomically newTMQueue
thread <- async $ consumer q
producer q
wait thread
我可以像这样制作一些测试输入文件
ghc -e 'writeFile "data.dat" (unlines (map show [1..5000000]))'
并像这样构建它
ghc --make QueueTest.hs -O2 -prof -auto-all -caf-all -threaded -rtsopts -o q
当我这样运行时./q +RTS -s -prof -hc -L60 -N2
,它说“正在使用的总内存为2117 MB”!但是输入文件只有38 MB!
我是剖析的新手,但是我已经生成了一个又一个的图,并且无法查明我的错误。
正如OP指出的那样,到目前为止,我还是可以写一个真实的答案。让我们从内存消耗开始。
两个有用的参考是Haskell数据类型的内存占用量和http://blog.johantibell.com/2011/06/memory-footprints-of-some-common-data.html。我们还需要查看一些结构的定义。
-- from http://hackage.haskell.org/package/stm-chans-3.0.0.2/docs/src/Control-Concurrent-STM-TMQueue.html
data TMQueue a = TMQueue
{-# UNPACK #-} !(TVar Bool)
{-# UNPACK #-} !(TQueue a)
deriving Typeable
-- from http://hackage.haskell.org/package/stm-2.4.3/docs/src/Control-Concurrent-STM-TQueue.html
-- | 'TQueue' is an abstract type representing an unbounded FIFO channel.
data TQueue a = TQueue {-# UNPACK #-} !(TVar [a])
{-# UNPACK #-} !(TVar [a])
该TQueue
实现使用具有读取端和写入端的标准功能队列。
让我们为内存使用量设置一个上限,并假设我们TMQueue
在使用者执行任何操作之前将整个文件读入。在这种情况下,我们的TQueue的写端将包含一个列表,每一输入行包含一个元素(存储为字节串)。每个列表节点看起来像
(:) bytestring tail
这需要3个字(每个字段1个,构造函数1个)。每个字节串为9个字,因此将这两个字加在一起,每行的开销为12个字,不包括实际数据。您的测试数据为500万行,因此整个文件的开销为6000万个字(加上一些常量),在64位系统上约为460MB(假设我做得对,总是很可疑)。添加40MB作为实际数据,我们得到的值非常接近我在系统上看到的值。
那么,为什么我们的内存使用量接近这个上限?我有一个理论(作为练习剩下的调查!)。首先,生产者的运行速度可能比消费者更快,这仅仅是因为读取通常比写入快(我正在使用旋转磁盘,也许SSD会有所不同)。这是readTQueue的定义:
-- |Read the next value from the 'TQueue'.
readTQueue :: TQueue a -> STM a
readTQueue (TQueue read write) = do
xs <- readTVar read
case xs of
(x:xs') -> do writeTVar read xs'
return x
[] -> do ys <- readTVar write
case ys of
[] -> retry
_ -> case reverse ys of
[] -> error "readTQueue"
(z:zs) -> do writeTVar write []
writeTVar read zs
return z
首先,我们尝试从读取端进行读取,如果该列为空,则在反转该列表之后,尝试从写入端进行读取。
我认为正在发生的事情是:当使用者需要从写端进行读取时,它需要遍历STM事务中的输入列表。这需要一些时间,这将使其与生产者竞争。随着生产者进一步前进,此列表将更长,从而导致读取花费更多时间,在此期间,生产者能够写入更多值,从而导致读取失败。重复此过程,直到生产者完成为止,然后消费者才有机会处理大量数据。这不仅会破坏并发性,还会增加更多的CPU开销,因为使用者事务不断重试和失败。
那么,鱼呢?有几个主要区别。首先,unagi-chan在内部使用数组而不是列表。这样可以稍微减少开销。大部分开销来自ByteString指针,所以不是很多,而是很少。其次,木会保留大量的数组。即使我们悲观地认为生产者总是赢得竞争,但在数组被填充后,它会被推离通道的生产者一侧。现在,生产者正在写一个新数组,而消费者则从旧数组中读取。这种情况是近乎理想的。没有共享资源的争用,使用者具有良好的引用位置,并且由于使用者正在处理不同的内存块,因此缓存一致性没有问题。与我的理论描述不同TMQueue
,现在您可以进行并发操作,从而允许生产者清除一些内存使用量,从而使其永远不会达到上限。
顺便说一句,我认为消费者分批处理没有好处。句柄已经由IO子系统缓冲,因此我认为这样做没有任何好处。对我来说,当我改变消费者以逐行方式进行操作时,性能会有所提高。
现在,您可以如何解决此问题?TMQueue
从存在争议问题的工作假设以及您指定的要求出发,您只需要使用其他类型的队列即可。显然菜效果很好。我还尝试过TMChan
,它比unagi慢25%,但使用的内存少45%,因此这也是一个不错的选择。(这并不奇怪,TMChan
具有TMQueue
与之不同的结构,因此具有不同的性能特征)
您也可以尝试更改算法,以便生产者发送多行块。这将降低所有ByteStrings的内存开销。
那么,什么时候可以使用TMQueue
?如果生产者和消费者的速度大致相同,或者消费者的速度更快,那应该没问题。另外,如果处理时间不一致,或者生产者突发运行,则可能会获得良好的摊销性能。这几乎是最坏的情况,也许应该将其报告为针对stm
?我认为如果将读取功能更改为
-- |Read the next value from the 'TQueue'.
readTQueue :: TQueue a -> STM a
readTQueue (TQueue read write) = do
xs <- readTVar read
case xs of
(x:xs') -> do writeTVar read xs'
return x
[] -> do ys <- readTVar write
case ys of
[] -> retry
_ -> do writeTVar write []
let (z:zs) = reverse ys
writeTVar read zs
return z
这样可以避免这个问题。现在,z
和zs
绑定都应该被懒惰地求值,因此列表遍历将在此事务之外进行,从而使读取操作有时在争用条件下也可以成功。当然,假设我首先对这个问题是正确的(并且这个定义足够懒惰)。但是,可能还有其他意外的缺点。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句