{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE RecordWildCards #-}
-- |
-- Module      : Control.Scheduler.Global
-- Copyright   : (c) Alexey Kuleshevich 2020
-- License     : BSD3
-- Maintainer  : Alexey Kuleshevich <lehins@yandex.ru>
-- Stability   : experimental
-- Portability : non-portable
--
module Control.Scheduler.Global
  ( GlobalScheduler
  , globalScheduler
  , newGlobalScheduler
  , withGlobalScheduler_
  ) where

import Control.Concurrent (ThreadId)
import Control.Concurrent.MVar
import Control.Exception
import Control.Monad
import Control.Monad.IO.Unlift
import Control.Monad.ST
import Control.Monad.Primitive
import Control.Scheduler
import Control.Scheduler.Internal
import Control.Scheduler.Types
import Data.IORef
import Data.Maybe
import System.IO.Unsafe (unsafePerformIO)

-- | Global scheduler with `Par` computation strategy that can be used anytime using
-- `withGlobalScheduler_`
globalScheduler :: GlobalScheduler
globalScheduler :: GlobalScheduler
globalScheduler = IO GlobalScheduler -> GlobalScheduler
forall a. IO a -> a
unsafePerformIO (Comp -> IO GlobalScheduler
forall (m :: * -> *). MonadIO m => Comp -> m GlobalScheduler
newGlobalScheduler Comp
Par)
{-# NOINLINE globalScheduler #-}


initGlobalScheduler ::
     MonadUnliftIO m => Comp -> (Scheduler RealWorld a -> [ThreadId] -> m b) -> m b
initGlobalScheduler :: forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp -> (Scheduler RealWorld a -> [ThreadId] -> m b) -> m b
initGlobalScheduler Comp
comp Scheduler RealWorld a -> [ThreadId] -> m b
action = ((forall a. m a -> IO a) -> IO b) -> m b
forall b. ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO b) -> m b)
-> ((forall a. m a -> IO a) -> IO b) -> m b
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> do
  (Jobs IO a
jobs, [ThreadId] -> Scheduler RealWorld a
mkScheduler) <- Comp
-> (Jobs IO a -> (WorkerId -> IO a) -> IO ())
-> (JQueue IO a -> IO [a])
-> IO (Jobs IO a, [ThreadId] -> Scheduler RealWorld a)
forall a.
Comp
-> (Jobs IO a -> (WorkerId -> IO a) -> IO ())
-> (JQueue IO a -> IO [a])
-> IO (Jobs IO a, [ThreadId] -> Scheduler RealWorld a)
initScheduler Comp
comp Jobs IO a -> (WorkerId -> IO a) -> IO ()
forall (m :: * -> *) a b.
MonadIO m =>
Jobs m a -> (WorkerId -> m b) -> m ()
scheduleJobs_ (IO [a] -> JQueue IO a -> IO [a]
forall a b. a -> b -> a
const ([a] -> IO [a]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []))
  IO [ThreadId]
-> ([ThreadId] -> IO ()) -> ([ThreadId] -> IO b) -> IO b
forall (m :: * -> *) a b c.
MonadUnliftIO m =>
m a -> (a -> m b) -> (a -> m c) -> m c
safeBracketOnError (Jobs IO a -> Comp -> IO [ThreadId]
forall (m :: * -> *) a.
MonadUnliftIO m =>
Jobs m a -> Comp -> m [ThreadId]
spawnWorkers Jobs IO a
jobs Comp
comp) [ThreadId] -> IO ()
terminateWorkers (([ThreadId] -> IO b) -> IO b) -> ([ThreadId] -> IO b) -> IO b
forall a b. (a -> b) -> a -> b
$ \[ThreadId]
tids ->
    m b -> IO b
forall a. m a -> IO a
run (Scheduler RealWorld a -> [ThreadId] -> m b
action ([ThreadId] -> Scheduler RealWorld a
mkScheduler [ThreadId]
tids) [ThreadId]
tids)


-- | Create a new global scheduler, in case a single one `globalScheduler` is not
-- sufficient. It is important to note that too much parallelization can significantly
-- degrate performance, therefore it is best not to use more than one scheduler at a time.
--
-- @since 1.5.0
newGlobalScheduler :: MonadIO m => Comp -> m GlobalScheduler
newGlobalScheduler :: forall (m :: * -> *). MonadIO m => Comp -> m GlobalScheduler
newGlobalScheduler Comp
comp =
  IO GlobalScheduler -> m GlobalScheduler
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO GlobalScheduler -> m GlobalScheduler)
-> IO GlobalScheduler -> m GlobalScheduler
forall a b. (a -> b) -> a -> b
$ Comp
-> (Scheduler RealWorld () -> [ThreadId] -> IO GlobalScheduler)
-> IO GlobalScheduler
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp -> (Scheduler RealWorld a -> [ThreadId] -> m b) -> m b
initGlobalScheduler Comp
comp ((Scheduler RealWorld () -> [ThreadId] -> IO GlobalScheduler)
 -> IO GlobalScheduler)
-> (Scheduler RealWorld () -> [ThreadId] -> IO GlobalScheduler)
-> IO GlobalScheduler
forall a b. (a -> b) -> a -> b
$ \Scheduler RealWorld ()
scheduler [ThreadId]
tids -> do
    MVar (Scheduler RealWorld ())
mvar <- Scheduler RealWorld () -> IO (MVar (Scheduler RealWorld ()))
forall a. a -> IO (MVar a)
newMVar Scheduler RealWorld ()
scheduler
    IORef [ThreadId]
tidsRef <- [ThreadId] -> IO (IORef [ThreadId])
forall a. a -> IO (IORef a)
newIORef [ThreadId]
tids
    Weak (MVar (Scheduler RealWorld ()))
_ <- MVar (Scheduler RealWorld ())
-> IO () -> IO (Weak (MVar (Scheduler RealWorld ())))
forall a. MVar a -> IO () -> IO (Weak (MVar a))
mkWeakMVar MVar (Scheduler RealWorld ())
mvar (IORef [ThreadId] -> IO [ThreadId]
forall a. IORef a -> IO a
readIORef IORef [ThreadId]
tidsRef IO [ThreadId] -> ([ThreadId] -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= [ThreadId] -> IO ()
terminateWorkers)
    GlobalScheduler -> IO GlobalScheduler
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (GlobalScheduler -> IO GlobalScheduler)
-> GlobalScheduler -> IO GlobalScheduler
forall a b. (a -> b) -> a -> b
$
      GlobalScheduler
        { globalSchedulerComp :: Comp
globalSchedulerComp = Comp
comp
        , globalSchedulerMVar :: MVar (Scheduler RealWorld ())
globalSchedulerMVar = MVar (Scheduler RealWorld ())
mvar
        , globalSchedulerThreadIdsRef :: IORef [ThreadId]
globalSchedulerThreadIdsRef = IORef [ThreadId]
tidsRef
        }

-- | Use the global scheduler if it is not busy, otherwise initialize a temporary one. It
-- means that this function by itself will not block, but if the same global scheduler
-- used concurrently other schedulers might get created.
--
-- @since 1.5.0
withGlobalScheduler_ :: MonadUnliftIO m => GlobalScheduler -> (Scheduler RealWorld () -> m a) -> m ()
withGlobalScheduler_ :: forall (m :: * -> *) a.
MonadUnliftIO m =>
GlobalScheduler -> (Scheduler RealWorld () -> m a) -> m ()
withGlobalScheduler_ GlobalScheduler {IORef [ThreadId]
MVar (Scheduler RealWorld ())
Comp
globalSchedulerComp :: GlobalScheduler -> Comp
globalSchedulerMVar :: GlobalScheduler -> MVar (Scheduler RealWorld ())
globalSchedulerThreadIdsRef :: GlobalScheduler -> IORef [ThreadId]
globalSchedulerComp :: Comp
globalSchedulerMVar :: MVar (Scheduler RealWorld ())
globalSchedulerThreadIdsRef :: IORef [ThreadId]
..} Scheduler RealWorld () -> m a
action =
  ((forall a. m a -> IO a) -> IO ()) -> m ()
forall b. ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO ()) -> m ())
-> ((forall a. m a -> IO a) -> IO ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> do
    let initializeNewScheduler :: m ()
initializeNewScheduler = do
          Comp -> (Scheduler RealWorld () -> [ThreadId] -> m ()) -> m ()
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp -> (Scheduler RealWorld a -> [ThreadId] -> m b) -> m b
initGlobalScheduler Comp
globalSchedulerComp ((Scheduler RealWorld () -> [ThreadId] -> m ()) -> m ())
-> (Scheduler RealWorld () -> [ThreadId] -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Scheduler RealWorld ()
scheduler [ThreadId]
tids ->
            IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
              [ThreadId]
oldTids <- IORef [ThreadId]
-> ([ThreadId] -> ([ThreadId], [ThreadId])) -> IO [ThreadId]
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef [ThreadId]
globalSchedulerThreadIdsRef (([ThreadId] -> ([ThreadId], [ThreadId])) -> IO [ThreadId])
-> ([ThreadId] -> ([ThreadId], [ThreadId])) -> IO [ThreadId]
forall a b. (a -> b) -> a -> b
$ (,) [ThreadId]
tids
              [ThreadId] -> IO ()
terminateWorkers [ThreadId]
oldTids
              MVar (Scheduler RealWorld ()) -> Scheduler RealWorld () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Scheduler RealWorld ())
globalSchedulerMVar Scheduler RealWorld ()
scheduler
    ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO ()) -> IO ())
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore ->
      MVar (Scheduler RealWorld ())
-> IO (Maybe (Scheduler RealWorld ()))
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar (Scheduler RealWorld ())
globalSchedulerMVar IO (Maybe (Scheduler RealWorld ()))
-> (Maybe (Scheduler RealWorld ()) -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Maybe (Scheduler RealWorld ())
Nothing -> IO () -> IO ()
forall a. IO a -> IO a
restore (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ m () -> IO ()
forall a. m a -> IO a
run (m () -> IO ()) -> m () -> IO ()
forall a b. (a -> b) -> a -> b
$ Comp -> (Scheduler RealWorld () -> m a) -> m ()
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp -> (Scheduler RealWorld a -> m b) -> m ()
withScheduler_ Comp
globalSchedulerComp Scheduler RealWorld () -> m a
action
        Just Scheduler RealWorld ()
scheduler -> do
          let runScheduler :: IO (Maybe (Results ()))
runScheduler = do
                a
_ <- m a -> IO a
forall a. m a -> IO a
run (m a -> IO a) -> m a -> IO a
forall a b. (a -> b) -> a -> b
$ Scheduler RealWorld () -> m a
action Scheduler RealWorld ()
scheduler
                Maybe (Results ())
mEarly <- ST (PrimState IO) (Maybe (Results ())) -> IO (Maybe (Results ()))
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (Scheduler RealWorld () -> ST RealWorld (Maybe (Results ()))
forall s a. Scheduler s a -> ST s (Maybe (Results a))
_earlyResults Scheduler RealWorld ()
scheduler)
                Maybe (Results ())
mEarly Maybe (Results ()) -> IO () -> IO (Maybe (Results ()))
forall a b. a -> IO b -> IO a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe (Results ()) -> Bool
forall a. Maybe a -> Bool
isNothing Maybe (Results ())
mEarly) (IO (Results ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ST (PrimState IO) (Results ()) -> IO (Results ())
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (Scheduler RealWorld () -> ST RealWorld (Results ())
forall s a. Scheduler s a -> ST s (Results a)
_waitForCurrentBatch Scheduler RealWorld ()
scheduler)))
          Maybe (Results ())
mEarly <- IO (Maybe (Results ())) -> IO (Maybe (Results ()))
forall a. IO a -> IO a
restore IO (Maybe (Results ()))
runScheduler IO (Maybe (Results ())) -> IO () -> IO (Maybe (Results ()))
forall a b. IO a -> IO b -> IO a
`onException` m () -> IO ()
forall a. m a -> IO a
run m ()
initializeNewScheduler
          -- Whenever a scheduler is terminated it is no longer usable, need to re-initialize
          case Maybe (Results ())
mEarly of
            Maybe (Results ())
Nothing -> MVar (Scheduler RealWorld ()) -> Scheduler RealWorld () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Scheduler RealWorld ())
globalSchedulerMVar Scheduler RealWorld ()
scheduler
            Just Results ()
_ -> m () -> IO ()
forall a. m a -> IO a
run m ()
initializeNewScheduler