Michael,
On Tue, 2012-11-27 at 17:14 +0200, Michael Snoyman wrote:
> I think the stm-conduit package[1] may be helpful for this use case.
> Each time you get a new command, you can fork a thread and give it the
> TBMChan to write to, and you can use sourceTBMChan to get a source to
> send to the client.
That's +- what I had in mind. I did find stm-conduit before and did try
to get the thing working using it, but these attempts failed.
I attached an example which might clarify what I intend to do. I'm aware
it contains several potential bugs (leaking threads etc), but that's
beside the question ;-)
If only I could figure out what to put on the 3 lines of comment I left
in there...
Thanks for your help,
Nicolas
{-# LANGUAGE Rank2Types #-}
module Main where
import Data.Conduit
import qualified Data.Conduit.List as CL
import Data.Conduit.TMChan
import Control.Applicative
import Control.Concurrent (forkIO)
import Control.Concurrent.STM (atomically)
import Control.Monad (forM_)
import Control.Monad.IO.Class (MonadIO, liftIO)
data Command = Add Int Int
| Disconnect
deriving (Show)
data Reply = Result Int
deriving (Show)
application :: MonadIO m => GConduit Int m String
application = do
-- Create input and output channels to/from worker threads
(chanIn, chanOut) <- liftIO $ (,) <$> newTBMChanIO 10 <*> newTBMChanIO 10
-- Spawn some worker threads
liftIO $ forM_ [0..5] $ \i -> forkIO $ processCommands i chanIn chanOut
-- How to make
-- sourceTBMChan chanOut
-- something of which all produced values are yield'ed by this Conduit?
loop chanIn
where
-- Loop retrieves one command from our source and pushes it to the
-- worker threads input channel, then loops
loop :: MonadIO m => TBMChan Command -> GConduit Int m String
loop chan = do
liftIO $ putStrLn "Enter loop"
cmd <- getCommand
liftIO $ do
putStrLn $ "Got command: " ++ show cmd
atomically $ writeTBMChan chan cmd
case cmd of
Disconnect -> return ()
_ -> loop chan
-- getCommand fetches and parses a single command from our source
getCommand :: Monad m => GSink Int m Command
getCommand = do
v <- await
case v of
Nothing -> return Disconnect
Just i -> return $ Add i 1
-- processCommands reads commands from a given input channel, processes
-- them, and pushes the result to a given output channel
processCommands :: Int -> TBMChan Command -> TBMChan Reply -> IO ()
processCommands i chanIn chanOut = do
putStrLn $ "Enter processCommands " ++ show i
cmd <- atomically $ readTBMChan chanIn
putStrLn $ show i ++ " read command: " ++ show cmd
case cmd of
Nothing -> return ()
Just (Add a b) -> do
atomically $ writeTBMChan chanOut (Result (a + b))
putStrLn $ show i ++ " pushed result"
processCommands i chanIn chanOut
Just Disconnect -> return ()
main :: IO ()
main = do
res <- CL.sourceList [1..20] $= application $$ CL.consume
putStrLn $ "Result: " ++ show res
_______________________________________________
Haskell-Cafe mailing list
[email protected]
http://www.haskell.org/mailman/listinfo/haskell-cafe