{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE RecordWildCards #-}
module Control.Scheduler.Global
( GlobalScheduler
, globalScheduler
, newGlobalScheduler
, withGlobalScheduler_
) where
import Control.Concurrent (ThreadId)
import Control.Concurrent.MVar
import Control.Exception
import Control.Monad
import Control.Monad.IO.Unlift
import Control.Monad.ST
import Control.Monad.Primitive
import Control.Scheduler
import Control.Scheduler.Internal
import Control.Scheduler.Types
import Data.IORef
import Data.Maybe
import System.IO.Unsafe (unsafePerformIO)
globalScheduler :: GlobalScheduler
globalScheduler :: GlobalScheduler
globalScheduler = IO GlobalScheduler -> GlobalScheduler
forall a. IO a -> a
unsafePerformIO (Comp -> IO GlobalScheduler
forall (m :: * -> *). MonadIO m => Comp -> m GlobalScheduler
newGlobalScheduler Comp
Par)
{-# NOINLINE globalScheduler #-}
initGlobalScheduler ::
MonadUnliftIO m => Comp -> (Scheduler RealWorld a -> [ThreadId] -> m b) -> m b
initGlobalScheduler :: forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp -> (Scheduler RealWorld a -> [ThreadId] -> m b) -> m b
initGlobalScheduler Comp
comp Scheduler RealWorld a -> [ThreadId] -> m b
action = ((forall a. m a -> IO a) -> IO b) -> m b
forall b. ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO b) -> m b)
-> ((forall a. m a -> IO a) -> IO b) -> m b
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> do
(jobs, mkScheduler) <- Comp
-> (Jobs IO a -> (WorkerId -> IO a) -> IO ())
-> (JQueue IO a -> IO [a])
-> IO (Jobs IO a, [ThreadId] -> Scheduler RealWorld a)
forall a.
Comp
-> (Jobs IO a -> (WorkerId -> IO a) -> IO ())
-> (JQueue IO a -> IO [a])
-> IO (Jobs IO a, [ThreadId] -> Scheduler RealWorld a)
initScheduler Comp
comp Jobs IO a -> (WorkerId -> IO a) -> IO ()
forall (m :: * -> *) a b.
MonadIO m =>
Jobs m a -> (WorkerId -> m b) -> m ()
scheduleJobs_ (IO [a] -> JQueue IO a -> IO [a]
forall a b. a -> b -> a
const ([a] -> IO [a]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []))
safeBracketOnError (spawnWorkers jobs comp) terminateWorkers $ \[ThreadId]
tids ->
m b -> IO b
forall a. m a -> IO a
run (Scheduler RealWorld a -> [ThreadId] -> m b
action ([ThreadId] -> Scheduler RealWorld a
mkScheduler [ThreadId]
tids) [ThreadId]
tids)
newGlobalScheduler :: MonadIO m => Comp -> m GlobalScheduler
newGlobalScheduler :: forall (m :: * -> *). MonadIO m => Comp -> m GlobalScheduler
newGlobalScheduler Comp
comp =
IO GlobalScheduler -> m GlobalScheduler
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO GlobalScheduler -> m GlobalScheduler)
-> IO GlobalScheduler -> m GlobalScheduler
forall a b. (a -> b) -> a -> b
$ Comp
-> (Scheduler RealWorld () -> [ThreadId] -> IO GlobalScheduler)
-> IO GlobalScheduler
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp -> (Scheduler RealWorld a -> [ThreadId] -> m b) -> m b
initGlobalScheduler Comp
comp ((Scheduler RealWorld () -> [ThreadId] -> IO GlobalScheduler)
-> IO GlobalScheduler)
-> (Scheduler RealWorld () -> [ThreadId] -> IO GlobalScheduler)
-> IO GlobalScheduler
forall a b. (a -> b) -> a -> b
$ \Scheduler RealWorld ()
scheduler [ThreadId]
tids -> do
mvar <- Scheduler RealWorld () -> IO (MVar (Scheduler RealWorld ()))
forall a. a -> IO (MVar a)
newMVar Scheduler RealWorld ()
scheduler
tidsRef <- newIORef tids
_ <- mkWeakMVar mvar (readIORef tidsRef >>= terminateWorkers)
pure $
GlobalScheduler
{ globalSchedulerComp = comp
, globalSchedulerMVar = mvar
, globalSchedulerThreadIdsRef = tidsRef
}
withGlobalScheduler_ :: MonadUnliftIO m => GlobalScheduler -> (Scheduler RealWorld () -> m a) -> m ()
withGlobalScheduler_ :: forall (m :: * -> *) a.
MonadUnliftIO m =>
GlobalScheduler -> (Scheduler RealWorld () -> m a) -> m ()
withGlobalScheduler_ GlobalScheduler {IORef [ThreadId]
MVar (Scheduler RealWorld ())
Comp
globalSchedulerComp :: GlobalScheduler -> Comp
globalSchedulerMVar :: GlobalScheduler -> MVar (Scheduler RealWorld ())
globalSchedulerThreadIdsRef :: GlobalScheduler -> IORef [ThreadId]
globalSchedulerComp :: Comp
globalSchedulerMVar :: MVar (Scheduler RealWorld ())
globalSchedulerThreadIdsRef :: IORef [ThreadId]
..} Scheduler RealWorld () -> m a
action =
((forall a. m a -> IO a) -> IO ()) -> m ()
forall b. ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO ()) -> m ())
-> ((forall a. m a -> IO a) -> IO ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> do
let initializeNewScheduler :: m ()
initializeNewScheduler = do
Comp -> (Scheduler RealWorld () -> [ThreadId] -> m ()) -> m ()
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp -> (Scheduler RealWorld a -> [ThreadId] -> m b) -> m b
initGlobalScheduler Comp
globalSchedulerComp ((Scheduler RealWorld () -> [ThreadId] -> m ()) -> m ())
-> (Scheduler RealWorld () -> [ThreadId] -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Scheduler RealWorld ()
scheduler [ThreadId]
tids ->
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
oldTids <- IORef [ThreadId]
-> ([ThreadId] -> ([ThreadId], [ThreadId])) -> IO [ThreadId]
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef [ThreadId]
globalSchedulerThreadIdsRef (([ThreadId] -> ([ThreadId], [ThreadId])) -> IO [ThreadId])
-> ([ThreadId] -> ([ThreadId], [ThreadId])) -> IO [ThreadId]
forall a b. (a -> b) -> a -> b
$ (,) [ThreadId]
tids
terminateWorkers oldTids
putMVar globalSchedulerMVar scheduler
((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO ()) -> IO ())
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore ->
MVar (Scheduler RealWorld ())
-> IO (Maybe (Scheduler RealWorld ()))
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar (Scheduler RealWorld ())
globalSchedulerMVar IO (Maybe (Scheduler RealWorld ()))
-> (Maybe (Scheduler RealWorld ()) -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe (Scheduler RealWorld ())
Nothing -> IO () -> IO ()
forall a. IO a -> IO a
restore (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ m () -> IO ()
forall a. m a -> IO a
run (m () -> IO ()) -> m () -> IO ()
forall a b. (a -> b) -> a -> b
$ Comp -> (Scheduler RealWorld () -> m a) -> m ()
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp -> (Scheduler RealWorld a -> m b) -> m ()
withScheduler_ Comp
globalSchedulerComp Scheduler RealWorld () -> m a
action
Just Scheduler RealWorld ()
scheduler -> do
let runScheduler :: IO (Maybe (Results ()))
runScheduler = do
_ <- m a -> IO a
forall a. m a -> IO a
run (m a -> IO a) -> m a -> IO a
forall a b. (a -> b) -> a -> b
$ Scheduler RealWorld () -> m a
action Scheduler RealWorld ()
scheduler
mEarly <- stToPrim (_earlyResults scheduler)
mEarly <$ when (isNothing mEarly) (void (stToPrim (_waitForCurrentBatch scheduler)))
mEarly <- IO (Maybe (Results ())) -> IO (Maybe (Results ()))
forall a. IO a -> IO a
restore IO (Maybe (Results ()))
runScheduler IO (Maybe (Results ())) -> IO () -> IO (Maybe (Results ()))
forall a b. IO a -> IO b -> IO a
`onException` m () -> IO ()
forall a. m a -> IO a
run m ()
initializeNewScheduler
case mEarly of
Maybe (Results ())
Nothing -> MVar (Scheduler RealWorld ()) -> Scheduler RealWorld () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Scheduler RealWorld ())
globalSchedulerMVar Scheduler RealWorld ()
scheduler
Just Results ()
_ -> m () -> IO ()
forall a. m a -> IO a
run m ()
initializeNewScheduler