(original) (raw)
{-# LANGUAGE Trustworthy #-} {-# LANGUAGE CPP , MagicHash , UnboxedTuples , ScopedTypeVariables , RankNTypes #-} {-# OPTIONS_GHC -Wno-deprecations #-}
module Control.Concurrent (
[ThreadId](GHC.Conc.Sync.html#ThreadId),
[myThreadId](GHC.Conc.Sync.html#myThreadId),
[forkIO](GHC.Conc.Sync.html#forkIO),
[forkFinally](Control.Concurrent.html#forkFinally),
[forkIOWithUnmask](GHC.Conc.Sync.html#forkIOWithUnmask),
[killThread](GHC.Conc.Sync.html#killThread),
[throwTo](GHC.Conc.Sync.html#throwTo),
[forkOn](GHC.Conc.Sync.html#forkOn),
[forkOnWithUnmask](GHC.Conc.Sync.html#forkOnWithUnmask),
[getNumCapabilities](GHC.Conc.Sync.html#getNumCapabilities),
[setNumCapabilities](GHC.Conc.Sync.html#setNumCapabilities),
[threadCapability](GHC.Conc.Sync.html#threadCapability),
[yield](GHC.Conc.Sync.html#yield),
[threadDelay](GHC.Conc.IO.html#threadDelay),
[threadWaitRead](Control.Concurrent.html#threadWaitRead),
[threadWaitWrite](Control.Concurrent.html#threadWaitWrite),
[threadWaitReadSTM](Control.Concurrent.html#threadWaitReadSTM),
[threadWaitWriteSTM](Control.Concurrent.html#threadWaitWriteSTM),
module [Control.Concurrent.MVar](Control.Concurrent.MVar.html),
module [Control.Concurrent.Chan](Control.Concurrent.Chan.html),
module [Control.Concurrent.QSem](Control.Concurrent.QSem.html),
module [Control.Concurrent.QSemN](Control.Concurrent.QSemN.html),
[rtsSupportsBoundThreads](Control.Concurrent.html#rtsSupportsBoundThreads),
[forkOS](Control.Concurrent.html#forkOS),
[forkOSWithUnmask](Control.Concurrent.html#forkOSWithUnmask),
[isCurrentThreadBound](Control.Concurrent.html#isCurrentThreadBound),
[runInBoundThread](Control.Concurrent.html#runInBoundThread),
[runInUnboundThread](Control.Concurrent.html#runInUnboundThread),
[mkWeakThreadId](GHC.Conc.Sync.html#mkWeakThreadId),
) whereimport Control.Exception.Base as Exception
import GHC.Conc hiding (threadWaitRead, threadWaitWrite, threadWaitReadSTM, threadWaitWriteSTM) import GHC.IO ( unsafeUnmask, catchException ) import GHC.IORef ( newIORef, readIORef, writeIORef ) import GHC.Base
import System.Posix.Types ( Fd ) import Foreign.StablePtr import Foreign.C.Types
#if defined(mingw32_HOST_OS) import Foreign.C import System.IO import Data.Functor ( void ) import Data.Int ( Int64 ) #else import qualified GHC.Conc #endif
import Control.Concurrent.MVar import Control.Concurrent.Chan import Control.Concurrent.QSem import Control.Concurrent.QSemN
forkFinally :: IO a -> (Either SomeException a -> IO ()) -> IO ThreadId forkFinally :: forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId forkFinally IO a action Either SomeException a -> IO () and_then = ((forall a. IO a -> IO a) -> IO ThreadId) -> IO ThreadId forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b mask (((forall a. IO a -> IO a) -> IO ThreadId) -> IO ThreadId) -> ((forall a. IO a -> IO a) -> IO ThreadId) -> IO ThreadId forall a b. (a -> b) -> a -> b $ \forall a. IO a -> IO a restore -> IO () -> IO ThreadId forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId forall a b. (a -> b) -> a -> b $ IO a -> IO (Either SomeException a) forall e a. Exception e => IO a -> IO (Either e a) try (IO a -> IO a forall a. IO a -> IO a restore IO a action) IO (Either SomeException a) -> (Either SomeException a -> IO ()) -> IO () forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b >>= Either SomeException a -> IO () and_then
foreign import ccall unsafe rtsSupportsBoundThreads :: Bool
forkOS :: IO () -> IO ThreadId
foreign export ccall forkOS_entry :: StablePtr (IO ()) -> IO ()
foreign import ccall "forkOS_entry" forkOS_entry_reimported :: StablePtr (IO ()) -> IO ()
forkOS_entry :: StablePtr (IO ()) -> IO () forkOS_entry :: StablePtr (IO ()) -> IO () forkOS_entry StablePtr (IO ()) stableAction = do IO () action <- StablePtr (IO ()) -> IO (IO ()) forall a. StablePtr a -> IO a deRefStablePtr StablePtr (IO ()) stableAction IO () action
foreign import ccall forkOS_createThread :: StablePtr (IO ()) -> IO CInt
failNonThreaded :: IO a failNonThreaded :: forall a. IO a failNonThreaded = String -> IO a forall (m :: * -> *) a. MonadFail m => String -> m a fail (String -> IO a) -> String -> IO a forall a b. (a -> b) -> a -> b $ String "RTS doesn't support multiple OS threads " String -> String -> String forall a. [a] -> [a] -> [a] ++String "(use ghc -threaded when linking)"
forkOS :: IO () -> IO ThreadId forkOS IO () action0 | Bool rtsSupportsBoundThreads = do MVar ThreadId mv <- IO (MVar ThreadId) forall a. IO (MVar a) newEmptyMVar MaskingState b <- IO MaskingState Exception.getMaskingState let
action1 :: IO ()action1 = case MaskingState b of MaskingState Unmasked -> IO () -> IO () forall a. IO a -> IO a unsafeUnmask IO () action0 MaskingState MaskedInterruptible -> IO () action0 MaskingState MaskedUninterruptible -> IO () -> IO () forall a. IO a -> IO a uninterruptibleMask_ IO () action0
action_plus :: IO ()action_plus = IO () -> (SomeException -> IO ()) -> IO () forall e a. Exception e => IO a -> (e -> IO a) -> IO a catch IO () action1 SomeException -> IO () childHandler
StablePtr (IO ())entry <- IO () -> IO (StablePtr (IO ())) forall a. a -> IO (StablePtr a) newStablePtr (IO ThreadId myThreadId IO ThreadId -> (ThreadId -> IO ()) -> IO () forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b >>= MVar ThreadId -> ThreadId -> IO () forall a. MVar a -> a -> IO () putMVar MVar ThreadId mv IO () -> IO () -> IO () forall (m :: * -> *) a b. Monad m => m a -> m b -> m b >> IO () action_plus) CInt err <- StablePtr (IO ()) -> IO CInt forkOS_createThread StablePtr (IO ()) entry Bool -> IO () -> IO () forall (f :: * -> *). Applicative f => Bool -> f () -> f () when (CInt err CInt -> CInt -> Bool forall a. Eq a => a -> a -> Bool /= CInt 0) (IO () -> IO ()) -> IO () -> IO () forall a b. (a -> b) -> a -> b $ String -> IO () forall (m :: * -> *) a. MonadFail m => String -> m a fail String "Cannot create OS thread." ThreadId tid <- MVar ThreadId -> IO ThreadId forall a. MVar a -> IO a takeMVar MVar ThreadId mv StablePtr (IO ()) -> IO () forall a. StablePtr a -> IO () freeStablePtr StablePtr (IO ()) entry ThreadId -> IO ThreadId forall (m :: * -> *) a. Monad m => a -> m a return ThreadId tid | Bool otherwise = IO ThreadId forall a. IO a failNonThreaded
forkOSWithUnmask :: ((forall a . IO a -> IO a) -> IO ()) -> IO ThreadId forkOSWithUnmask :: ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId forkOSWithUnmask (forall a. IO a -> IO a) -> IO () io = IO () -> IO ThreadId forkOS ((forall a. IO a -> IO a) -> IO () io forall a. IO a -> IO a unsafeUnmask)
isCurrentThreadBound :: IO Bool isCurrentThreadBound :: IO Bool isCurrentThreadBound = (State# RealWorld -> (# State# RealWorld, Bool #)) -> IO Bool forall a. (State# RealWorld -> (# State# RealWorld, a #)) -> IO a IO ((State# RealWorld -> (# State# RealWorld, Bool #)) -> IO Bool) -> (State# RealWorld -> (# State# RealWorld, Bool #)) -> IO Bool forall a b. (a -> b) -> a -> b $ \ State# RealWorld s# -> case State# RealWorld -> (# State# RealWorld, Int# #) isCurrentThreadBound# State# RealWorld s# of (# State# RealWorld s2#, Int# flg #) -> (# State# RealWorld s2#, Int# -> Bool isTrue# (Int# flg Int# -> Int# -> Int# /=# Int# 0#) #)
runInBoundThread :: IO a -> IO a
runInBoundThread :: forall a. IO a -> IO a runInBoundThread IO a action | Bool rtsSupportsBoundThreads = do Bool bound <- IO Bool isCurrentThreadBound if Bool bound then IO a action else do IORef (Either SomeException a) ref <- Either SomeException a -> IO (IORef (Either SomeException a)) forall a. a -> IO (IORef a) newIORef Either SomeException a forall a. HasCallStack => a undefined let action_plus :: IO () action_plus = IO a -> IO (Either SomeException a) forall e a. Exception e => IO a -> IO (Either e a) Exception.try IO a action IO (Either SomeException a) -> (Either SomeException a -> IO ()) -> IO () forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b >>= IORef (Either SomeException a) -> Either SomeException a -> IO () forall a. IORef a -> a -> IO () writeIORef IORef (Either SomeException a) ref IO (StablePtr (IO ())) -> (StablePtr (IO ()) -> IO ()) -> (StablePtr (IO ()) -> IO (Either SomeException a)) -> IO (Either SomeException a) forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c bracket (IO () -> IO (StablePtr (IO ())) forall a. a -> IO (StablePtr a) newStablePtr IO () action_plus) StablePtr (IO ()) -> IO () forall a. StablePtr a -> IO () freeStablePtr (\StablePtr (IO ()) cEntry -> StablePtr (IO ()) -> IO () forkOS_entry_reimported StablePtr (IO ()) cEntry IO () -> IO (Either SomeException a) -> IO (Either SomeException a) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b >> IORef (Either SomeException a) -> IO (Either SomeException a) forall a. IORef a -> IO a readIORef IORef (Either SomeException a) ref) IO (Either SomeException a) -> (Either SomeException a -> IO a) -> IO a forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b >>= Either SomeException a -> IO a forall a. Either SomeException a -> IO a unsafeResult | Bool otherwise = IO a forall a. IO a failNonThreaded
runInUnboundThread :: IO a -> IO a
runInUnboundThread :: forall a. IO a -> IO a
runInUnboundThread IO a
action = do
Bool
bound <- IO Bool
isCurrentThreadBound
if Bool
bound
then do
MVar (Either SomeException a)
mv <- IO (MVar (Either SomeException a))
forall a. IO (MVar a)
newEmptyMVar
((forall a. IO a -> IO a) -> IO a) -> IO a
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO a) -> IO a)
-> ((forall a. IO a -> IO a) -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
ThreadId
tid <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO a -> IO (Either SomeException a)
forall e a. Exception e => IO a -> IO (Either e a)
Exception.try (IO a -> IO a
forall a. IO a -> IO a
restore IO a
action) IO (Either SomeException a)
-> (Either SomeException a -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= MVar (Either SomeException a) -> Either SomeException a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException a)
mv
let wait :: IO (Either SomeException a)
wait = MVar (Either SomeException a) -> IO (Either SomeException a)
forall a. MVar a -> IO a
takeMVar MVar (Either SomeException a)
mv IO (Either SomeException a)
-> (SomeException -> IO (Either SomeException a))
-> IO (Either SomeException a)
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catchException (SomeException
e :: SomeException) ->
ThreadId -> SomeException -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
Exception.throwTo ThreadId
tid SomeException
e IO () -> IO (Either SomeException a) -> IO (Either SomeException a)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO (Either SomeException a)
wait
IO (Either SomeException a)
wait IO (Either SomeException a)
-> (Either SomeException a -> IO a) -> IO a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Either SomeException a -> IO a
forall a. Either SomeException a -> IO a
unsafeResult
else IO a
action
unsafeResult :: Either SomeException a -> IO a unsafeResult :: forall a. Either SomeException a -> IO a unsafeResult = (SomeException -> IO a) -> (a -> IO a) -> Either SomeException a -> IO a forall a c b. (a -> c) -> (b -> c) -> Either a b -> c either SomeException -> IO a forall e a. Exception e => e -> IO a Exception.throwIO a -> IO a forall (m :: * -> *) a. Monad m => a -> m a return
threadWaitRead :: Fd -> IO () threadWaitRead :: Fd -> IO () threadWaitRead Fd fd #if defined(mingw32_HOST_OS)
| threaded = withThread (waitFd fd False) | otherwise = case fd of 0 -> do _ <- hWaitForInput stdin (-1) return ()
_ -> errorWithoutStackTrace "threadWaitRead requires -threaded on Windows, or use System.IO.hWaitForInput"#else = Fd -> IO () GHC.Conc.threadWaitRead Fd fd #endif
threadWaitWrite :: Fd -> IO () threadWaitWrite :: Fd -> IO () threadWaitWrite Fd fd #if defined(mingw32_HOST_OS) | threaded = withThread (waitFd fd True) | otherwise = errorWithoutStackTrace "threadWaitWrite requires -threaded on Windows" #else = Fd -> IO () GHC.Conc.threadWaitWrite Fd fd #endif
threadWaitReadSTM :: Fd -> IO (STM (), IO ()) threadWaitReadSTM :: Fd -> IO (STM (), IO ()) threadWaitReadSTM Fd fd #if defined(mingw32_HOST_OS) | threaded = do v <- newTVarIO Nothing mask_ $ void $ forkIO $ do result <- try (waitFd fd False) atomically (writeTVar v $ Just result) let waitAction = do result <- readTVar v case result of Nothing -> retry Just (Right ()) -> return () Just (Left e) -> throwSTM (e :: IOException) let killAction = return () return (waitAction, killAction) | otherwise = errorWithoutStackTrace "threadWaitReadSTM requires -threaded on Windows" #else = Fd -> IO (STM (), IO ()) GHC.Conc.threadWaitReadSTM Fd fd #endif
threadWaitWriteSTM :: Fd -> IO (STM (), IO ()) threadWaitWriteSTM :: Fd -> IO (STM (), IO ()) threadWaitWriteSTM Fd fd #if defined(mingw32_HOST_OS) | threaded = do v <- newTVarIO Nothing mask_ $ void $ forkIO $ do result <- try (waitFd fd True) atomically (writeTVar v $ Just result) let waitAction = do result <- readTVar v case result of Nothing -> retry Just (Right ()) -> return () Just (Left e) -> throwSTM (e :: IOException) let killAction = return () return (waitAction, killAction) | otherwise = errorWithoutStackTrace "threadWaitWriteSTM requires -threaded on Windows" #else = Fd -> IO (STM (), IO ()) GHC.Conc.threadWaitWriteSTM Fd fd #endif
#if defined(mingw32_HOST_OS) foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
withThread :: IO a -> IO a withThread io = do m <- newEmptyMVar _ <- mask_ $ forkIO $ try io >>= putMVar m x <- takeMVar m case x of Right a -> return a Left e -> throwIO (e :: IOException)
waitFd :: Fd -> Bool -> IO () waitFd fd write = do throwErrnoIfMinus1_ "fdReady" $ fdReady (fromIntegral fd) (if write then 1 else 0) (-1) 0
foreign import ccall safe "fdReady" fdReady :: CInt -> CBool -> Int64 -> CBool -> IO CInt #endif