Control/Concurrent.hs (original) (raw)
module Control.Concurrent (
ThreadId,
#ifdef GLASGOW_HASKELL myThreadId, #endif
forkIO,
#ifdef GLASGOW_HASKELL forkFinally, forkIOWithUnmask, killThread, throwTo, #endif
forkOn,
forkOnWithUnmask,
getNumCapabilities,
setNumCapabilities,
threadCapability,
yield,
#ifdef GLASGOW_HASKELL
threadDelay,
threadWaitRead,
threadWaitWrite,
#endif
module Control.Concurrent.MVar,
module Control.Concurrent.Chan,
module Control.Concurrent.QSem,
module Control.Concurrent.QSemN,
module Control.Concurrent.SampleVar,
#ifndef HUGS
mergeIO,
nmergeIO,
#endif
#ifdef GLASGOW_HASKELL
rtsSupportsBoundThreads,
forkOS,
isCurrentThreadBound,
runInBoundThread,
runInUnboundThread,
#endif
mkWeakThreadId,
forkIOUnmasked
) where
import Prelude
import Control.Exception.Base as Exception
#ifdef GLASGOW_HASKELL import GHC.Exception import GHC.Conc hiding (threadWaitRead, threadWaitWrite) import qualified GHC.Conc import GHC.IO ( IO(..), unsafeInterleaveIO, unsafeUnmask ) import GHC.IORef ( newIORef, readIORef, writeIORef ) import GHC.Base
import System.Posix.Types ( Fd ) import Foreign.StablePtr import Foreign.C.Types import Control.Monad ( when )
#ifdef mingw32_HOST_OS import Foreign.C import System.IO #endif #endif
#ifdef HUGS import Hugs.ConcBase #endif
import Control.Concurrent.MVar import Control.Concurrent.Chan import Control.Concurrent.QSem import Control.Concurrent.QSemN import Control.Concurrent.SampleVar
#ifdef HUGS type ThreadId = () #endif
forkFinally :: IO a -> (Either SomeException a -> IO ()) -> IO ThreadId forkFinally action and_then = mask $ \restore -> forkIO $ try (restore action) >>= and_then
#ifndef HUGS max_buff_size :: Int max_buff_size = 1
mergeIO :: [a] -> [a] -> IO [a] nmergeIO :: [[a]] -> IO [a]
mergeIO ls rs = newEmptyMVar >>= \ tail_node -> newMVar tail_node >>= \ tail_list -> newQSem max_buff_size >>= \ e -> newMVar 2 >>= \ branches_running -> let buff = (tail_list,e) in forkIO (suckIO branches_running buff ls) >> forkIO (suckIO branches_running buff rs) >> takeMVar tail_node >>= \ val -> signalQSem e >> return val
type Buffer a = (MVar (MVar [a]), QSem)
suckIO :: MVar Int -> Buffer a -> [a] -> IO ()
suckIO branches_running buff@(tail_list,e) vs = case vs of [] -> takeMVar branches_running >>= \ val -> if val == 1 then takeMVar tail_list >>= \ node -> putMVar node [] >> putMVar tail_list node else putMVar branches_running (val1) (x:xs) -> waitQSem e >> takeMVar tail_list >>= \ node -> newEmptyMVar >>= \ next_node -> unsafeInterleaveIO ( takeMVar next_node >>= \ y -> signalQSem e >> return y) >>= \ next_node_val -> putMVar node (x:next_node_val) >> putMVar tail_list next_node >> suckIO branches_running buff xs
nmergeIO lss = let len = length lss in newEmptyMVar >>= \ tail_node -> newMVar tail_node >>= \ tail_list -> newQSem max_buff_size >>= \ e -> newMVar len >>= \ branches_running -> let buff = (tail_list,e) in mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >> takeMVar tail_node >>= \ val -> signalQSem e >> return val where mapIO f xs = sequence (map f xs) #endif /* HUGS */
#ifdef GLASGOW_HASKELL
foreign import ccall rtsSupportsBoundThreads :: Bool
forkOS :: IO () -> IO ThreadId
foreign export ccall forkOS_entry :: StablePtr (IO ()) -> IO ()
foreign import ccall "forkOS_entry" forkOS_entry_reimported :: StablePtr (IO ()) -> IO ()
forkOS_entry :: StablePtr (IO ()) -> IO () forkOS_entry stableAction = do action <- deRefStablePtr stableAction action
foreign import ccall forkOS_createThread :: StablePtr (IO ()) -> IO CInt
failNonThreaded :: IO a failNonThreaded = fail $ "RTS doesn't support multiple OS threads " ++"(use ghc -threaded when linking)"
forkOS action0 | rtsSupportsBoundThreads = do mv <- newEmptyMVar b <- Exception.getMaskingState let
action1 = case b of
Unmasked -> unsafeUnmask action0
MaskedInterruptible -> action0
MaskedUninterruptible -> uninterruptibleMask_ action0
action_plus = Exception.catch action1 childHandler
entry <- newStablePtr (myThreadId >>= putMVar mv >> action_plus)
err <- forkOS_createThread entry
when (err /= 0) $ fail "Cannot create OS thread."
tid <- takeMVar mv
freeStablePtr entry
return tid
| otherwise = failNonThreaded
isCurrentThreadBound :: IO Bool isCurrentThreadBound = IO $ \ s# -> case isCurrentThreadBound# s# of (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)
runInBoundThread :: IO a -> IO a
runInBoundThread action | rtsSupportsBoundThreads = do bound <- isCurrentThreadBound if bound then action else do ref <- newIORef undefined let action_plus = Exception.try action >>= writeIORef ref bracket (newStablePtr action_plus) freeStablePtr (\cEntry -> forkOS_entry_reimported cEntry >> readIORef ref) >>= unsafeResult | otherwise = failNonThreaded
runInUnboundThread :: IO a -> IO a
runInUnboundThread action = do
bound <- isCurrentThreadBound
if bound
then do
mv <- newEmptyMVar
mask $ \restore -> do
tid <- forkIO $ Exception.try (restore action) >>= putMVar mv
let wait = takeMVar mv Exception.catch
(e :: SomeException) ->
Exception.throwTo tid e >> wait
wait >>= unsafeResult
else action
unsafeResult :: Either SomeException a -> IO a unsafeResult = either Exception.throwIO return #endif /* GLASGOW_HASKELL */
#ifdef GLASGOW_HASKELL
threadWaitRead :: Fd -> IO () threadWaitRead fd #ifdef mingw32_HOST_OS
| threaded = withThread (waitFd fd 0) | otherwise = case fd of 0 -> do _ <- hWaitForInput stdin (1) return ()
_ -> error "threadWaitRead requires -threaded on Windows, or use System.IO.hWaitForInput"
#else = GHC.Conc.threadWaitRead fd #endif
threadWaitWrite :: Fd -> IO () threadWaitWrite fd #ifdef mingw32_HOST_OS | threaded = withThread (waitFd fd 1) | otherwise = error "threadWaitWrite requires -threaded on Windows" #else = GHC.Conc.threadWaitWrite fd #endif
#ifdef mingw32_HOST_OS foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
withThread :: IO a -> IO a withThread io = do m <- newEmptyMVar _ <- mask_ $ forkIO $ try io >>= putMVar m x <- takeMVar m case x of Right a -> return a Left e -> throwIO (e :: IOException)
waitFd :: Fd -> CInt -> IO () waitFd fd write = do throwErrnoIfMinus1_ "fdReady" $ fdReady (fromIntegral fd) write iNFINITE 0
iNFINITE :: CInt iNFINITE = 0xFFFFFFFF
foreign import ccall safe "fdReady" fdReady :: CInt -> CInt -> CInt -> CInt -> IO CInt #endif
#endif /* GLASGOW_HASKELL */