Control/Concurrent.hs (original) (raw)

module Control.Concurrent (

    ThreadId,

#ifdef GLASGOW_HASKELL myThreadId, #endif

    forkIO,

#ifdef GLASGOW_HASKELL forkFinally, forkIOWithUnmask, killThread, throwTo, #endif

    forkOn,
    forkOnWithUnmask,
    getNumCapabilities,
    setNumCapabilities,
    threadCapability,

    
    yield,                  
    

#ifdef GLASGOW_HASKELL

    threadDelay,            
    threadWaitRead,         
    threadWaitWrite,        

#endif

    module Control.Concurrent.MVar,
    module Control.Concurrent.Chan,
    module Control.Concurrent.QSem,
    module Control.Concurrent.QSemN,
    module Control.Concurrent.SampleVar,

    

#ifndef HUGS mergeIO,
nmergeIO,
#endif

#ifdef GLASGOW_HASKELL

    rtsSupportsBoundThreads,
    forkOS,
    isCurrentThreadBound,
    runInBoundThread,
    runInUnboundThread,

#endif

    mkWeakThreadId,

    
    forkIOUnmasked

) where

import Prelude

import Control.Exception.Base as Exception

#ifdef GLASGOW_HASKELL import GHC.Exception import GHC.Conc hiding (threadWaitRead, threadWaitWrite) import qualified GHC.Conc import GHC.IO ( IO(..), unsafeInterleaveIO, unsafeUnmask ) import GHC.IORef ( newIORef, readIORef, writeIORef ) import GHC.Base

import System.Posix.Types ( Fd ) import Foreign.StablePtr import Foreign.C.Types import Control.Monad ( when )

#ifdef mingw32_HOST_OS import Foreign.C import System.IO #endif #endif

#ifdef HUGS import Hugs.ConcBase #endif

import Control.Concurrent.MVar import Control.Concurrent.Chan import Control.Concurrent.QSem import Control.Concurrent.QSemN import Control.Concurrent.SampleVar

#ifdef HUGS type ThreadId = () #endif

forkFinally :: IO a -> (Either SomeException a -> IO ()) -> IO ThreadId forkFinally action and_then = mask $ \restore -> forkIO $ try (restore action) >>= and_then

#ifndef HUGS max_buff_size :: Int max_buff_size = 1

mergeIO :: [a] -> [a] -> IO [a] nmergeIO :: [[a]] -> IO [a]

mergeIO ls rs = newEmptyMVar >>= \ tail_node -> newMVar tail_node >>= \ tail_list -> newQSem max_buff_size >>= \ e -> newMVar 2 >>= \ branches_running -> let buff = (tail_list,e) in forkIO (suckIO branches_running buff ls) >> forkIO (suckIO branches_running buff rs) >> takeMVar tail_node >>= \ val -> signalQSem e >> return val

type Buffer a = (MVar (MVar [a]), QSem)

suckIO :: MVar Int -> Buffer a -> [a] -> IO ()

suckIO branches_running buff@(tail_list,e) vs = case vs of [] -> takeMVar branches_running >>= \ val -> if val == 1 then takeMVar tail_list >>= \ node -> putMVar node [] >> putMVar tail_list node else putMVar branches_running (val1) (x:xs) -> waitQSem e >> takeMVar tail_list >>= \ node -> newEmptyMVar >>= \ next_node -> unsafeInterleaveIO ( takeMVar next_node >>= \ y -> signalQSem e >> return y) >>= \ next_node_val -> putMVar node (x:next_node_val) >> putMVar tail_list next_node >> suckIO branches_running buff xs

nmergeIO lss = let len = length lss in newEmptyMVar >>= \ tail_node -> newMVar tail_node >>= \ tail_list -> newQSem max_buff_size >>= \ e -> newMVar len >>= \ branches_running -> let buff = (tail_list,e) in mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >> takeMVar tail_node >>= \ val -> signalQSem e >> return val where mapIO f xs = sequence (map f xs) #endif /* HUGS */

#ifdef GLASGOW_HASKELL

foreign import ccall rtsSupportsBoundThreads :: Bool

forkOS :: IO () -> IO ThreadId

foreign export ccall forkOS_entry :: StablePtr (IO ()) -> IO ()

foreign import ccall "forkOS_entry" forkOS_entry_reimported :: StablePtr (IO ()) -> IO ()

forkOS_entry :: StablePtr (IO ()) -> IO () forkOS_entry stableAction = do action <- deRefStablePtr stableAction action

foreign import ccall forkOS_createThread :: StablePtr (IO ()) -> IO CInt

failNonThreaded :: IO a failNonThreaded = fail $ "RTS doesn't support multiple OS threads " ++"(use ghc -threaded when linking)"

forkOS action0 | rtsSupportsBoundThreads = do mv <- newEmptyMVar b <- Exception.getMaskingState let

        action1 = case b of
                    Unmasked -> unsafeUnmask action0
                    MaskedInterruptible -> action0
                    MaskedUninterruptible -> uninterruptibleMask_ action0

        action_plus = Exception.catch action1 childHandler

    entry <- newStablePtr (myThreadId >>= putMVar mv >> action_plus)
    err <- forkOS_createThread entry
    when (err /= 0) $ fail "Cannot create OS thread."
    tid <- takeMVar mv
    freeStablePtr entry
    return tid
| otherwise = failNonThreaded

isCurrentThreadBound :: IO Bool isCurrentThreadBound = IO $ \ s# -> case isCurrentThreadBound# s# of (# s2#, flg #) -> (# s2#, not (flg ==# 0#) #)

runInBoundThread :: IO a -> IO a

runInBoundThread action | rtsSupportsBoundThreads = do bound <- isCurrentThreadBound if bound then action else do ref <- newIORef undefined let action_plus = Exception.try action >>= writeIORef ref bracket (newStablePtr action_plus) freeStablePtr (\cEntry -> forkOS_entry_reimported cEntry >> readIORef ref) >>= unsafeResult | otherwise = failNonThreaded

runInUnboundThread :: IO a -> IO a

runInUnboundThread action = do bound <- isCurrentThreadBound if bound then do mv <- newEmptyMVar mask $ \restore -> do tid <- forkIO $ Exception.try (restore action) >>= putMVar mv let wait = takeMVar mv Exception.catch (e :: SomeException) -> Exception.throwTo tid e >> wait wait >>= unsafeResult else action

unsafeResult :: Either SomeException a -> IO a unsafeResult = either Exception.throwIO return #endif /* GLASGOW_HASKELL */

#ifdef GLASGOW_HASKELL

threadWaitRead :: Fd -> IO () threadWaitRead fd #ifdef mingw32_HOST_OS

| threaded = withThread (waitFd fd 0) | otherwise = case fd of 0 -> do _ <- hWaitForInput stdin (1) return ()

              _ -> error "threadWaitRead requires -threaded on Windows, or use System.IO.hWaitForInput"

#else = GHC.Conc.threadWaitRead fd #endif

threadWaitWrite :: Fd -> IO () threadWaitWrite fd #ifdef mingw32_HOST_OS | threaded = withThread (waitFd fd 1) | otherwise = error "threadWaitWrite requires -threaded on Windows" #else = GHC.Conc.threadWaitWrite fd #endif

#ifdef mingw32_HOST_OS foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool

withThread :: IO a -> IO a withThread io = do m <- newEmptyMVar _ <- mask_ $ forkIO $ try io >>= putMVar m x <- takeMVar m case x of Right a -> return a Left e -> throwIO (e :: IOException)

waitFd :: Fd -> CInt -> IO () waitFd fd write = do throwErrnoIfMinus1_ "fdReady" $ fdReady (fromIntegral fd) write iNFINITE 0

iNFINITE :: CInt iNFINITE = 0xFFFFFFFF

foreign import ccall safe "fdReady" fdReady :: CInt -> CInt -> CInt -> CInt -> IO CInt #endif

#endif /* GLASGOW_HASKELL */