On Sun, 2010-01-03 at 17:34 +0100, Maciej Piechotka wrote: > I have following problem: I'd like to operate on big files so I'd > prefere to operate on 'stream' instead of whole file at a time to avoid > keeping too much in memory. I need to calculate MD5 and compress file. > > I tried to use something like that but I'm afraid that I'd need to patch > zlib package as it results in deadlock: > If I add:
> pipeline4 = do file <- oneToOneChannel' $ chanLabel "File" > data_ <- oneToOneChannel' $ chanLabel "Data" > compressed <- oneToOneChannel' $ chanLabel "Compressed" > runParallel_ [getFiles (writer file), > readFromFile (reader file) > (writer data_), > compressCHP (reader data_) > (writer compressed), > CHP.consume (reader compressed)] And change compress to(I'm not tested without change but here I omit explicit interleave): > stateM :: Monad m => a -> (a -> m a) -> m b > stateM i f = f i >>= flip stateM f Like forever but with state > chanMaybe2List :: (ReadableChannel r, Poisonable (r (Maybe a)), > WriteableChannel w, Poisonable (w [a])) > => r (Maybe a) > -> w [a] > -> CHP () > chanMaybe2List in_ out = do > chan <- liftIO $ newChan > list <- liftIO ((Just Nothing :) <$> getChanContents chan) > runParallel_ [forever (readChannel in_ >>= > liftIO . writeChan chan . Just) > `onPoisonRethrow` (liftIO (writeChan chan Nothing) >> > poison in_), > forever $ stateM list process] > where process (Nothing :_) = poison out >> throwPoison > process (Just Nothing:Nothing:_) = poison out >> throwPoison > process (Just Nothing:xs) = > let (this, that) = span isJust xs > isJust = maybe False (maybe False (const True)) > this' = map fromJust (map fromJust this) > in writeChannel out (map fromJust $ map fromJust this) >> > process that Writes to output lazy list of all elements in input > compressCHP' :: (ReadableChannel r, Poisonable (r [BS.ByteString]), > WriteableChannel w, Poisonable (w [BS.ByteString])) > => r [BS.ByteString] > -> w [BS.ByteString] > -> CHP () > compressCHP' in_ out = forever (writeChannel out . > LBS.toChunks . compress . > LBS.fromChunks =<< > readChannel in_) > `onPoisonRethrow` (poison in_ >> poison out) Compresses the lists of chunks > toMaybeList :: (ReadableChannel r, Poisonable (r [a]), > WriteableChannel w, Poisonable (w (Maybe a))) > => r [a] > -> w (Maybe a) > -> CHP () > toMaybeList in_ out = forever (readChannel in_ >>= > mapM_ (writeChannel out . Just) >> > writeChannel out Nothing) > `onPoisonRethrow` (poison in_ >> poison out) Converts back to list > compressCHP :: (ReadableChannel r, > Poisonable (r (Maybe BS.ByteString)), > WriteableChannel w, > Poisonable (w (Maybe BS.ByteString))) > => r (Maybe BS.ByteString) > -> w (Maybe BS.ByteString) > -> CHP () > compressCHP = chanMaybe2List |->| compressCHP' |->| toMaybeList Combines all 3 operations. However pipeline3 still results in deadlock: Just "Test1\n" (CHP) Thread terminated with: thread blocked indefinitely in an STM transaction < _b3, _b4, _b4, File GZ."test1.gz", _c5, _b6, _c7, _b3 > Well - at least I know where there is no problem. Regardss _______________________________________________ Haskell-Cafe mailing list [email protected] http://www.haskell.org/mailman/listinfo/haskell-cafe
