{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MonoLocalBinds #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# OPTIONS_HADDOCK hide, not-home #-}
-- |
-- Module      : Control.Scheduler.Internal
-- Copyright   : (c) Alexey Kuleshevich 2018-2020
-- License     : BSD3
-- Maintainer  : Alexey Kuleshevich <lehins@yandex.ru>
-- Stability   : experimental
-- Portability : non-portable
--
module Control.Scheduler.Internal
  ( withSchedulerInternal
  , initWorkerStates
  , withSchedulerWSInternal
  , trivialScheduler_
  , withTrivialSchedulerR
  , withTrivialSchedulerRIO
  , initScheduler
  , spawnWorkers
  , terminateWorkers
  , scheduleJobs
  , scheduleJobs_
  , scheduleJobsWith
  , reverseResults
  , resultsToList
  , traverse_
  , safeBracketOnError
  ) where

import Data.Coerce
import Control.Concurrent
import Control.Exception
import Control.Monad
import Control.Monad.IO.Unlift
import Control.Monad.Primitive
import Control.Scheduler.Computation
import Control.Scheduler.Types
import Control.Scheduler.Queue
import Data.Atomics (atomicModifyIORefCAS, atomicModifyIORefCAS_)
import qualified Data.Foldable as F (foldl')
import Data.IORef
import Data.Primitive.SmallArray
import Data.Primitive.MutVar
import Data.Primitive.PVar



-- | Initialize a separate state for each worker.
--
-- @since 1.4.0
initWorkerStates :: MonadIO m => Comp -> (WorkerId -> m ws) -> m (WorkerStates ws)
initWorkerStates :: forall (m :: * -> *) ws.
MonadIO m =>
Comp -> (WorkerId -> m ws) -> m (WorkerStates ws)
initWorkerStates Comp
comp WorkerId -> m ws
initState = do
  Int
nWorkers <- Comp -> m Int
forall (m :: * -> *). MonadIO m => Comp -> m Int
getCompWorkers Comp
comp
  SmallMutableArray RealWorld ws
arr <- IO (SmallMutableArray RealWorld ws)
-> m (SmallMutableArray RealWorld ws)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (SmallMutableArray RealWorld ws)
 -> m (SmallMutableArray RealWorld ws))
-> IO (SmallMutableArray RealWorld ws)
-> m (SmallMutableArray RealWorld ws)
forall a b. (a -> b) -> a -> b
$ Int -> ws -> IO (SmallMutableArray (PrimState IO) ws)
forall (m :: * -> *) a.
PrimMonad m =>
Int -> a -> m (SmallMutableArray (PrimState m) a)
newSmallArray Int
nWorkers ([Char] -> ws
forall a. HasCallStack => [Char] -> a
error [Char]
"Uninitialized")
  let go :: Int -> m ()
go Int
i =
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
nWorkers) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
          ws
state <- WorkerId -> m ws
initState (Int -> WorkerId
WorkerId Int
i)
          IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SmallMutableArray (PrimState IO) ws -> Int -> ws -> IO ()
forall (m :: * -> *) a.
PrimMonad m =>
SmallMutableArray (PrimState m) a -> Int -> a -> m ()
writeSmallArray SmallMutableArray RealWorld ws
SmallMutableArray (PrimState IO) ws
arr Int
i ws
state
          Int -> m ()
go (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
  Int -> m ()
go Int
0
  SmallArray ws
workerStates <- IO (SmallArray ws) -> m (SmallArray ws)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (SmallArray ws) -> m (SmallArray ws))
-> IO (SmallArray ws) -> m (SmallArray ws)
forall a b. (a -> b) -> a -> b
$ SmallMutableArray (PrimState IO) ws -> IO (SmallArray ws)
forall (m :: * -> *) a.
PrimMonad m =>
SmallMutableArray (PrimState m) a -> m (SmallArray a)
unsafeFreezeSmallArray SmallMutableArray RealWorld ws
SmallMutableArray (PrimState IO) ws
arr
  PVar Int RealWorld
mutex <- IO (PVar Int RealWorld) -> m (PVar Int RealWorld)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (PVar Int RealWorld) -> m (PVar Int RealWorld))
-> IO (PVar Int RealWorld) -> m (PVar Int RealWorld)
forall a b. (a -> b) -> a -> b
$ Int -> IO (PVar Int RealWorld)
forall s (m :: * -> *) a.
(MonadPrim s m, Prim a) =>
a -> m (PVar a s)
newPVar Int
0
  WorkerStates ws -> m (WorkerStates ws)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    WorkerStates
      {_workerStatesComp :: Comp
_workerStatesComp = Comp
comp, _workerStatesArray :: SmallArray ws
_workerStatesArray = SmallArray ws
workerStates, _workerStatesMutex :: PVar Int RealWorld
_workerStatesMutex = PVar Int RealWorld
mutex}

withSchedulerWSInternal ::
     MonadUnliftIO m
  => (Comp -> (Scheduler RealWorld a -> t) -> m b)
  -> WorkerStates ws
  -> (SchedulerWS ws a -> t)
  -> m b
withSchedulerWSInternal :: forall (m :: * -> *) a t b ws.
MonadUnliftIO m =>
(Comp -> (Scheduler RealWorld a -> t) -> m b)
-> WorkerStates ws -> (SchedulerWS ws a -> t) -> m b
withSchedulerWSInternal Comp -> (Scheduler RealWorld a -> t) -> m b
withScheduler' WorkerStates ws
states SchedulerWS ws a -> t
action =
  ((forall a. m a -> IO a) -> IO b) -> m b
forall b. ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO b) -> m b)
-> ((forall a. m a -> IO a) -> IO b) -> m b
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> IO Int -> (Int -> IO ()) -> (Int -> IO b) -> IO b
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO Int
lockState Int -> IO ()
unlockState (m b -> IO b
forall a. m a -> IO a
run (m b -> IO b) -> (Int -> m b) -> Int -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> m b
runSchedulerWS)
  where
    mutex :: PVar Int RealWorld
mutex = WorkerStates ws -> PVar Int RealWorld
forall ws. WorkerStates ws -> PVar Int RealWorld
_workerStatesMutex WorkerStates ws
states
    lockState :: IO Int
lockState = PVar Int RealWorld -> Int -> IO Int
forall s (m :: * -> *). MonadPrim s m => PVar Int s -> Int -> m Int
atomicOrIntPVar PVar Int RealWorld
mutex Int
1
    unlockState :: Int -> IO ()
unlockState Int
wasLocked
      | Int
wasLocked Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
1 = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      | Bool
otherwise = IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO Int
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> IO Int) -> IO Int -> IO Int
forall a b. (a -> b) -> a -> b
$ PVar Int RealWorld -> Int -> IO Int
forall s (m :: * -> *). MonadPrim s m => PVar Int s -> Int -> m Int
atomicAndIntPVar PVar Int RealWorld
mutex Int
0
    runSchedulerWS :: Int -> m b
runSchedulerWS Int
isLocked
      | Int
isLocked Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
1 = IO b -> m b
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO b -> m b) -> IO b -> m b
forall a b. (a -> b) -> a -> b
$ MutexException -> IO b
forall e a. Exception e => e -> IO a
throwIO MutexException
MutexException
      | Bool
otherwise =
        Comp -> (Scheduler RealWorld a -> t) -> m b
withScheduler' (WorkerStates ws -> Comp
forall ws. WorkerStates ws -> Comp
_workerStatesComp WorkerStates ws
states) ((Scheduler RealWorld a -> t) -> m b)
-> (Scheduler RealWorld a -> t) -> m b
forall a b. (a -> b) -> a -> b
$ \Scheduler RealWorld a
scheduler ->
          SchedulerWS ws a -> t
action (WorkerStates ws -> Scheduler RealWorld a -> SchedulerWS ws a
forall ws a.
WorkerStates ws -> Scheduler RealWorld a -> SchedulerWS ws a
SchedulerWS WorkerStates ws
states Scheduler RealWorld a
scheduler)


-- | The most basic scheduler that simply runs the task instead of scheduling it. Early termination
-- requests are bluntly ignored.
--
-- @since 1.1.0
trivialScheduler_ :: Scheduler s ()
trivialScheduler_ :: forall s. Scheduler s ()
trivialScheduler_ =
  Scheduler
    { _numWorkers :: Int
_numWorkers = Int
1
    , _scheduleWorkId :: (WorkerId -> ST s ()) -> ST s ()
_scheduleWorkId = \WorkerId -> ST s ()
f -> WorkerId -> ST s ()
f (Int -> WorkerId
WorkerId Int
0)
    , _terminate :: Early () -> ST s ()
_terminate = ST s () -> Early () -> ST s ()
forall a b. a -> b -> a
const (ST s () -> Early () -> ST s ()) -> ST s () -> Early () -> ST s ()
forall a b. (a -> b) -> a -> b
$ () -> ST s ()
forall a. a -> ST s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    , _waitForCurrentBatch :: ST s (Results ())
_waitForCurrentBatch = Results () -> ST s (Results ())
forall a. a -> ST s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Results () -> ST s (Results ()))
-> Results () -> ST s (Results ())
forall a b. (a -> b) -> a -> b
$ [()] -> Results ()
forall a. [a] -> Results a
Finished []
    , _earlyResults :: ST s (Maybe (Results ()))
_earlyResults = Maybe (Results ()) -> ST s (Maybe (Results ()))
forall a. a -> ST s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Results ())
forall a. Maybe a
Nothing
    , _currentBatchId :: ST s BatchId
_currentBatchId = BatchId -> ST s BatchId
forall a. a -> ST s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (BatchId -> ST s BatchId) -> BatchId -> ST s BatchId
forall a b. (a -> b) -> a -> b
$ Int -> BatchId
BatchId Int
0
    , _cancelBatch :: BatchId -> Early () -> ST s Bool
_cancelBatch = \BatchId
_ Early ()
_ -> Bool -> ST s Bool
forall a. a -> ST s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
    , _batchEarly :: ST s (Maybe (Early ()))
_batchEarly = Maybe (Early ()) -> ST s (Maybe (Early ()))
forall a. a -> ST s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Early ())
forall a. Maybe a
Nothing
    }


-- | This trivial scheduler will behave in a similar way as
-- `Control.Scheduler.withSchedulerR` with `Seq` computation strategy, except it is
-- restricted to `PrimMonad`, instead of `MonadUnliftIO` and the work isn't scheduled, but
-- rather computed immediately.
--
-- @since 1.4.2
withTrivialSchedulerR :: forall a b m s. MonadPrim s m => (Scheduler s a -> m b) -> m (Results a)
withTrivialSchedulerR :: forall a b (m :: * -> *) s.
MonadPrim s m =>
(Scheduler s a -> m b) -> m (Results a)
withTrivialSchedulerR Scheduler s a -> m b
action = do
  MutVar s [a]
resVar <- [a] -> m (MutVar (PrimState m) [a])
forall (m :: * -> *) a.
PrimMonad m =>
a -> m (MutVar (PrimState m) a)
newMutVar []
  MutVar s BatchId
batchVar <- BatchId -> m (MutVar (PrimState m) BatchId)
forall (m :: * -> *) a.
PrimMonad m =>
a -> m (MutVar (PrimState m) a)
newMutVar (BatchId -> m (MutVar (PrimState m) BatchId))
-> BatchId -> m (MutVar (PrimState m) BatchId)
forall a b. (a -> b) -> a -> b
$ Int -> BatchId
BatchId Int
0
  MutVar s (Maybe (Results a))
finResVar <- Maybe (Results a) -> m (MutVar (PrimState m) (Maybe (Results a)))
forall (m :: * -> *) a.
PrimMonad m =>
a -> m (MutVar (PrimState m) a)
newMutVar Maybe (Results a)
forall a. Maybe a
Nothing
  MutVar s (Maybe (Early a))
batchEarlyVar <- Maybe (Early a) -> m (MutVar (PrimState m) (Maybe (Early a)))
forall (m :: * -> *) a.
PrimMonad m =>
a -> m (MutVar (PrimState m) a)
newMutVar Maybe (Early a)
forall a. Maybe a
Nothing
  let bumpCurrentBatchId :: MonadPrim s m' => m' ()
      bumpCurrentBatchId :: forall (m' :: * -> *). MonadPrim s m' => m' ()
bumpCurrentBatchId = MutVar (PrimState m') BatchId
-> (BatchId -> (BatchId, ())) -> m' ()
forall (m :: * -> *) a b.
PrimMonad m =>
MutVar (PrimState m) a -> (a -> (a, b)) -> m b
atomicModifyMutVar' MutVar s BatchId
MutVar (PrimState m') BatchId
batchVar (\(BatchId Int
x) -> (Int -> BatchId
BatchId (Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1), ()))
      bumpBatchId :: MonadPrim s m' => BatchId -> m' Bool
      bumpBatchId :: forall (m' :: * -> *). MonadPrim s m' => BatchId -> m' Bool
bumpBatchId (BatchId Int
c) =
        MutVar (PrimState m') BatchId
-> (BatchId -> (BatchId, Bool)) -> m' Bool
forall (m :: * -> *) a b.
PrimMonad m =>
MutVar (PrimState m) a -> (a -> (a, b)) -> m b
atomicModifyMutVar' MutVar s BatchId
MutVar (PrimState m') BatchId
batchVar ((BatchId -> (BatchId, Bool)) -> m' Bool)
-> (BatchId -> (BatchId, Bool)) -> m' Bool
forall a b. (a -> b) -> a -> b
$ \b :: BatchId
b@(BatchId Int
x) ->
          if Int
x Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
c
            then (Int -> BatchId
BatchId (Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1), Bool
True)
            else (BatchId
b, Bool
False)
      takeBatchEarly :: MonadPrim s m' => m' (Maybe (Early a))
      takeBatchEarly :: forall (m' :: * -> *). MonadPrim s m' => m' (Maybe (Early a))
takeBatchEarly = MutVar (PrimState m') (Maybe (Early a))
-> (Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
-> m' (Maybe (Early a))
forall (m :: * -> *) a b.
PrimMonad m =>
MutVar (PrimState m) a -> (a -> (a, b)) -> m b
atomicModifyMutVar' MutVar s (Maybe (Early a))
MutVar (PrimState m') (Maybe (Early a))
batchEarlyVar ((Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
 -> m' (Maybe (Early a)))
-> (Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
-> m' (Maybe (Early a))
forall a b. (a -> b) -> a -> b
$ \Maybe (Early a)
mEarly -> (Maybe (Early a)
forall a. Maybe a
Nothing, Maybe (Early a)
mEarly)
      takeResults :: MonadPrim s m' => m' [a]
      takeResults :: forall (m' :: * -> *). MonadPrim s m' => m' [a]
takeResults = MutVar (PrimState m') [a] -> ([a] -> ([a], [a])) -> m' [a]
forall (m :: * -> *) a b.
PrimMonad m =>
MutVar (PrimState m) a -> (a -> (a, b)) -> m b
atomicModifyMutVar' MutVar s [a]
MutVar (PrimState m') [a]
resVar (([a] -> ([a], [a])) -> m' [a]) -> ([a] -> ([a], [a])) -> m' [a]
forall a b. (a -> b) -> a -> b
$ \[a]
res -> ([], [a]
res)
  b
_ <-
    Scheduler s a -> m b
action (Scheduler s a -> m b) -> Scheduler s a -> m b
forall a b. (a -> b) -> a -> b
$
    Scheduler
      { _numWorkers :: Int
_numWorkers = Int
1
      , _scheduleWorkId :: (WorkerId -> ST s a) -> ST s ()
_scheduleWorkId =
          \WorkerId -> ST s a
f -> do
            a
r <- WorkerId -> ST s a
f (Int -> WorkerId
WorkerId Int
0)
            a
r a -> ST s () -> ST s ()
forall a b. a -> b -> b
`seq` MutVar (PrimState (ST s)) [a] -> ([a] -> ([a], ())) -> ST s ()
forall (m :: * -> *) a b.
PrimMonad m =>
MutVar (PrimState m) a -> (a -> (a, b)) -> m b
atomicModifyMutVar' MutVar s [a]
MutVar (PrimState (ST s)) [a]
resVar (\[a]
rs -> (a
r a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
rs, ()))
      , _terminate :: Early a -> ST s a
_terminate =
          \Early a
early -> do
            ST s ()
forall (m' :: * -> *). MonadPrim s m' => m' ()
bumpCurrentBatchId
            Results a
finishEarly <- Maybe (Early a) -> ST s [a] -> ST s (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults (Early a -> Maybe (Early a)
forall a. a -> Maybe a
Just Early a
early) ST s [a]
forall (m' :: * -> *). MonadPrim s m' => m' [a]
takeResults
            Early a -> a
forall a. Early a -> a
unEarly Early a
early a -> ST s () -> ST s a
forall a b. a -> ST s b -> ST s a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ MutVar (PrimState (ST s)) (Maybe (Results a))
-> Maybe (Results a) -> ST s ()
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> a -> m ()
writeMutVar MutVar s (Maybe (Results a))
MutVar (PrimState (ST s)) (Maybe (Results a))
finResVar (Results a -> Maybe (Results a)
forall a. a -> Maybe a
Just Results a
finishEarly)
      , _waitForCurrentBatch :: ST s (Results a)
_waitForCurrentBatch =
          do Maybe (Early a)
mEarly <- ST s (Maybe (Early a))
forall (m' :: * -> *). MonadPrim s m' => m' (Maybe (Early a))
takeBatchEarly
             ST s ()
forall (m' :: * -> *). MonadPrim s m' => m' ()
bumpCurrentBatchId
             Maybe (Early a) -> ST s [a] -> ST s (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly (ST s [a] -> ST s (Results a))
-> ([a] -> ST s [a]) -> [a] -> ST s (Results a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [a] -> ST s [a]
forall a. a -> ST s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([a] -> ST s (Results a)) -> ST s [a] -> ST s (Results a)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< ST s [a]
forall (m' :: * -> *). MonadPrim s m' => m' [a]
takeResults
      , _earlyResults :: ST s (Maybe (Results a))
_earlyResults = MutVar (PrimState (ST s)) (Maybe (Results a))
-> ST s (Maybe (Results a))
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar s (Maybe (Results a))
MutVar (PrimState (ST s)) (Maybe (Results a))
finResVar
      , _currentBatchId :: ST s BatchId
_currentBatchId = MutVar (PrimState (ST s)) BatchId -> ST s BatchId
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar s BatchId
MutVar (PrimState (ST s)) BatchId
batchVar
      , _batchEarly :: ST s (Maybe (Early a))
_batchEarly = ST s (Maybe (Early a))
forall (m' :: * -> *). MonadPrim s m' => m' (Maybe (Early a))
takeBatchEarly
      , _cancelBatch :: BatchId -> Early a -> ST s Bool
_cancelBatch =
          \BatchId
batchId Early a
early -> do
            Bool
b <- BatchId -> ST s Bool
forall (m' :: * -> *). MonadPrim s m' => BatchId -> m' Bool
bumpBatchId BatchId
batchId
            Bool -> ST s () -> ST s ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
b (ST s () -> ST s ()) -> ST s () -> ST s ()
forall a b. (a -> b) -> a -> b
$ MutVar (PrimState (ST s)) (Maybe (Early a))
-> Maybe (Early a) -> ST s ()
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> a -> m ()
writeMutVar MutVar s (Maybe (Early a))
MutVar (PrimState (ST s)) (Maybe (Early a))
batchEarlyVar (Early a -> Maybe (Early a)
forall a. a -> Maybe a
Just Early a
early)
            Bool -> ST s Bool
forall a. a -> ST s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
b
      }
  MutVar (PrimState m) (Maybe (Results a)) -> m (Maybe (Results a))
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar s (Maybe (Results a))
MutVar (PrimState m) (Maybe (Results a))
finResVar m (Maybe (Results a))
-> (Maybe (Results a) -> m (Results a)) -> m (Results a)
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Just Results a
rs -> Results a -> m (Results a)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Results a -> m (Results a)) -> Results a -> m (Results a)
forall a b. (a -> b) -> a -> b
$ Results a -> Results a
forall a. Results a -> Results a
reverseResults Results a
rs
    Maybe (Results a)
Nothing -> do
      Maybe (Early a)
mEarly <- m (Maybe (Early a))
forall (m' :: * -> *). MonadPrim s m' => m' (Maybe (Early a))
takeBatchEarly
      Results a -> Results a
forall a. Results a -> Results a
reverseResults (Results a -> Results a) -> m (Results a) -> m (Results a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (Early a) -> m [a] -> m (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly m [a]
forall (m' :: * -> *). MonadPrim s m' => m' [a]
takeResults



-- | Same as `Control.Scheduler.withTrivialScheduler`, but works in `MonadUnliftIO` and
-- returns results in an original LIFO order.
--
-- @since 1.4.2
withTrivialSchedulerRIO :: MonadUnliftIO m => (Scheduler RealWorld a -> m b) -> m (Results a)
withTrivialSchedulerRIO :: forall (m :: * -> *) a b.
MonadUnliftIO m =>
(Scheduler RealWorld a -> m b) -> m (Results a)
withTrivialSchedulerRIO Scheduler RealWorld a -> m b
action = do
  IORef [a]
resRef <- IO (IORef [a]) -> m (IORef [a])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef [a]) -> m (IORef [a]))
-> IO (IORef [a]) -> m (IORef [a])
forall a b. (a -> b) -> a -> b
$ [a] -> IO (IORef [a])
forall a. a -> IO (IORef a)
newIORef []
  IORef BatchId
batchRef <- IO (IORef BatchId) -> m (IORef BatchId)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef BatchId) -> m (IORef BatchId))
-> IO (IORef BatchId) -> m (IORef BatchId)
forall a b. (a -> b) -> a -> b
$ BatchId -> IO (IORef BatchId)
forall a. a -> IO (IORef a)
newIORef (BatchId -> IO (IORef BatchId)) -> BatchId -> IO (IORef BatchId)
forall a b. (a -> b) -> a -> b
$ Int -> BatchId
BatchId Int
0
  IORef (Maybe (Results a))
finResRef <- IO (IORef (Maybe (Results a))) -> m (IORef (Maybe (Results a)))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Maybe (Results a))) -> m (IORef (Maybe (Results a))))
-> IO (IORef (Maybe (Results a))) -> m (IORef (Maybe (Results a)))
forall a b. (a -> b) -> a -> b
$ Maybe (Results a) -> IO (IORef (Maybe (Results a)))
forall a. a -> IO (IORef a)
newIORef Maybe (Results a)
forall a. Maybe a
Nothing
  IORef (Maybe (Early a))
batchEarlyRef <- IO (IORef (Maybe (Early a))) -> m (IORef (Maybe (Early a)))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Maybe (Early a))) -> m (IORef (Maybe (Early a))))
-> IO (IORef (Maybe (Early a))) -> m (IORef (Maybe (Early a)))
forall a b. (a -> b) -> a -> b
$ Maybe (Early a) -> IO (IORef (Maybe (Early a)))
forall a. a -> IO (IORef a)
newIORef Maybe (Early a)
forall a. Maybe a
Nothing
  let bumpCurrentBatchId :: IO ()
bumpCurrentBatchId = IORef Int -> (Int -> Int) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (IORef BatchId -> IORef Int
forall a b. Coercible a b => a -> b
coerce IORef BatchId
batchRef) (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ (Int
1 :: Int))
      bumpBatchId :: BatchId -> IO Bool
bumpBatchId (BatchId Int
c) =
        IORef BatchId -> (BatchId -> (BatchId, Bool)) -> IO Bool
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef BatchId
batchRef ((BatchId -> (BatchId, Bool)) -> IO Bool)
-> (BatchId -> (BatchId, Bool)) -> IO Bool
forall a b. (a -> b) -> a -> b
$ \b :: BatchId
b@(BatchId Int
x) ->
          if Int
x Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
c
            then (Int -> BatchId
BatchId (Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1), Bool
True)
            else (BatchId
b, Bool
False)
      takeBatchEarly :: IO (Maybe (Early a))
takeBatchEarly = IORef (Maybe (Early a))
-> (Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
-> IO (Maybe (Early a))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef (Maybe (Early a))
batchEarlyRef ((Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
 -> IO (Maybe (Early a)))
-> (Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
-> IO (Maybe (Early a))
forall a b. (a -> b) -> a -> b
$ \Maybe (Early a)
mEarly -> (Maybe (Early a)
forall a. Maybe a
Nothing, Maybe (Early a)
mEarly)
      takeResults :: IO [a]
takeResults = IORef [a] -> ([a] -> ([a], [a])) -> IO [a]
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef [a]
resRef (([a] -> ([a], [a])) -> IO [a]) -> ([a] -> ([a], [a])) -> IO [a]
forall a b. (a -> b) -> a -> b
$ \[a]
res -> ([], [a]
res)
      scheduler :: Scheduler RealWorld a
scheduler =
        Scheduler
          { _numWorkers :: Int
_numWorkers = Int
1
          , _scheduleWorkId :: (WorkerId -> ST RealWorld a) -> ST RealWorld ()
_scheduleWorkId =
              \WorkerId -> ST RealWorld a
f -> do
                a
r <- WorkerId -> ST RealWorld a
f (Int -> WorkerId
WorkerId Int
0)
                a
r a -> ST RealWorld () -> ST RealWorld ()
forall a b. a -> b -> b
`seq` IO () -> ST RealWorld ()
forall (m :: * -> *) a.
(PrimMonad m, PrimState m ~ RealWorld) =>
IO a -> m a
ioToPrim (IORef [a] -> ([a] -> [a]) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef [a]
resRef (a
r a -> [a] -> [a]
forall a. a -> [a] -> [a]
:))
          , _terminate :: Early a -> ST RealWorld a
_terminate =
              \ !Early a
early ->
                IO a -> ST RealWorld a
forall (m :: * -> *) a.
(PrimMonad m, PrimState m ~ RealWorld) =>
IO a -> m a
ioToPrim (IO a -> ST RealWorld a) -> IO a -> ST RealWorld a
forall a b. (a -> b) -> a -> b
$ do
                  IO ()
bumpCurrentBatchId
                  Results a
finishEarly <- Maybe (Early a) -> IO [a] -> IO (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults (Early a -> Maybe (Early a)
forall a. a -> Maybe a
Just Early a
early) IO [a]
takeResults
                  IORef (Maybe (Results a)) -> Maybe (Results a) -> IO ()
forall a. IORef a -> a -> IO ()
atomicWriteIORef IORef (Maybe (Results a))
finResRef (Results a -> Maybe (Results a)
forall a. a -> Maybe a
Just Results a
finishEarly)
                  TerminateEarlyException -> IO a
forall e a. Exception e => e -> IO a
throwIO TerminateEarlyException
TerminateEarlyException
          , _waitForCurrentBatch :: ST RealWorld (Results a)
_waitForCurrentBatch =
              IO (Results a) -> ST RealWorld (Results a)
forall (m :: * -> *) a.
(PrimMonad m, PrimState m ~ RealWorld) =>
IO a -> m a
ioToPrim (IO (Results a) -> ST RealWorld (Results a))
-> IO (Results a) -> ST RealWorld (Results a)
forall a b. (a -> b) -> a -> b
$ do
                IO ()
bumpCurrentBatchId
                Maybe (Early a)
mEarly <- IO (Maybe (Early a))
takeBatchEarly
                Maybe (Early a) -> IO [a] -> IO (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly (IO [a] -> IO (Results a))
-> ([a] -> IO [a]) -> [a] -> IO (Results a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [a] -> IO [a]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([a] -> IO (Results a)) -> IO [a] -> IO (Results a)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO [a]
takeResults
          , _earlyResults :: ST RealWorld (Maybe (Results a))
_earlyResults = IO (Maybe (Results a)) -> ST RealWorld (Maybe (Results a))
forall (m :: * -> *) a.
(PrimMonad m, PrimState m ~ RealWorld) =>
IO a -> m a
ioToPrim (IORef (Maybe (Results a)) -> IO (Maybe (Results a))
forall a. IORef a -> IO a
readIORef IORef (Maybe (Results a))
finResRef)
          , _currentBatchId :: ST RealWorld BatchId
_currentBatchId = IO BatchId -> ST RealWorld BatchId
forall (m :: * -> *) a.
(PrimMonad m, PrimState m ~ RealWorld) =>
IO a -> m a
ioToPrim (IORef BatchId -> IO BatchId
forall a. IORef a -> IO a
readIORef IORef BatchId
batchRef)
          , _batchEarly :: ST RealWorld (Maybe (Early a))
_batchEarly = IO (Maybe (Early a)) -> ST RealWorld (Maybe (Early a))
forall (m :: * -> *) a.
(PrimMonad m, PrimState m ~ RealWorld) =>
IO a -> m a
ioToPrim IO (Maybe (Early a))
takeBatchEarly
          , _cancelBatch :: BatchId -> Early a -> ST RealWorld Bool
_cancelBatch =
              \BatchId
batchId Early a
early -> IO Bool -> ST RealWorld Bool
forall (m :: * -> *) a.
(PrimMonad m, PrimState m ~ RealWorld) =>
IO a -> m a
ioToPrim (IO Bool -> ST RealWorld Bool) -> IO Bool -> ST RealWorld Bool
forall a b. (a -> b) -> a -> b
$ do
                Bool
b <- BatchId -> IO Bool
bumpBatchId BatchId
batchId
                Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
b (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe (Early a)) -> Maybe (Early a) -> IO ()
forall a. IORef a -> a -> IO ()
atomicWriteIORef IORef (Maybe (Early a))
batchEarlyRef (Early a -> Maybe (Early a)
forall a. a -> Maybe a
Just Early a
early)
                Bool -> IO Bool
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
b
          }
  Either TerminateEarlyException b
_ :: Either TerminateEarlyException b <- ((forall a. m a -> IO a) -> IO (Either TerminateEarlyException b))
-> m (Either TerminateEarlyException b)
forall b. ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO (Either TerminateEarlyException b))
 -> m (Either TerminateEarlyException b))
-> ((forall a. m a -> IO a)
    -> IO (Either TerminateEarlyException b))
-> m (Either TerminateEarlyException b)
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> IO b -> IO (Either TerminateEarlyException b)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO b -> IO (Either TerminateEarlyException b))
-> IO b -> IO (Either TerminateEarlyException b)
forall a b. (a -> b) -> a -> b
$ m b -> IO b
forall a. m a -> IO a
run (m b -> IO b) -> m b -> IO b
forall a b. (a -> b) -> a -> b
$ Scheduler RealWorld a -> m b
action Scheduler RealWorld a
scheduler
  IO (Maybe (Results a)) -> m (Maybe (Results a))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IORef (Maybe (Results a)) -> IO (Maybe (Results a))
forall a. IORef a -> IO a
readIORef IORef (Maybe (Results a))
finResRef) m (Maybe (Results a))
-> (Maybe (Results a) -> m (Results a)) -> m (Results a)
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Just Results a
rs -> Results a -> m (Results a)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Results a
rs
    Maybe (Results a)
Nothing ->
      IO (Results a) -> m (Results a)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Results a) -> m (Results a))
-> IO (Results a) -> m (Results a)
forall a b. (a -> b) -> a -> b
$ do
        Maybe (Early a)
mEarly <- IO (Maybe (Early a))
takeBatchEarly
        Maybe (Early a) -> IO [a] -> IO (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly IO [a]
takeResults
{-# INLINEABLE withTrivialSchedulerRIO #-}


-- | This is generally a faster way to traverse while ignoring the result rather than using `mapM_`.
--
-- @since 1.0.0
traverse_ :: (Applicative f, Foldable t) => (a -> f ()) -> t a -> f ()
traverse_ :: forall (f :: * -> *) (t :: * -> *) a.
(Applicative f, Foldable t) =>
(a -> f ()) -> t a -> f ()
traverse_ a -> f ()
f = (f () -> a -> f ()) -> f () -> t a -> f ()
forall b a. (b -> a -> b) -> b -> t a -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
F.foldl' (\f ()
c a
a -> f ()
c f () -> f () -> f ()
forall a b. f a -> f b -> f b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> a -> f ()
f a
a) (() -> f ()
forall a. a -> f a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
{-# INLINE traverse_ #-}

scheduleJobs :: MonadIO m => Jobs m a -> (WorkerId -> m a) -> m ()
scheduleJobs :: forall (m :: * -> *) a.
MonadIO m =>
Jobs m a -> (WorkerId -> m a) -> m ()
scheduleJobs = (((a -> m ()) -> WorkerId -> m ()) -> m (Job m a))
-> Jobs m a -> (WorkerId -> m a) -> m ()
forall (m :: * -> *) b a.
MonadIO m =>
(((b -> m ()) -> WorkerId -> m ()) -> m (Job m a))
-> Jobs m a -> (WorkerId -> m b) -> m ()
scheduleJobsWith ((a -> m ()) -> WorkerId -> m ()) -> m (Job m a)
forall (m :: * -> *) a.
MonadIO m =>
((a -> m ()) -> WorkerId -> m ()) -> m (Job m a)
mkJob
{-# INLINEABLE scheduleJobs #-}

-- | Ignores the result of computation, thus avoiding some overhead.
scheduleJobs_ :: MonadIO m => Jobs m a -> (WorkerId -> m b) -> m ()
scheduleJobs_ :: forall (m :: * -> *) a b.
MonadIO m =>
Jobs m a -> (WorkerId -> m b) -> m ()
scheduleJobs_ = (((b -> m ()) -> WorkerId -> m ()) -> m (Job m a))
-> Jobs m a -> (WorkerId -> m b) -> m ()
forall (m :: * -> *) b a.
MonadIO m =>
(((b -> m ()) -> WorkerId -> m ()) -> m (Job m a))
-> Jobs m a -> (WorkerId -> m b) -> m ()
scheduleJobsWith (\(b -> m ()) -> WorkerId -> m ()
job -> Job m a -> m (Job m a)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((WorkerId -> m ()) -> Job m a
forall (m :: * -> *) a. (WorkerId -> m ()) -> Job m a
Job_ (m () -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m () -> m ()) -> (WorkerId -> m ()) -> WorkerId -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (b -> m ()) -> WorkerId -> m ()
job (\b
_ -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()))))
{-# INLINEABLE scheduleJobs_ #-}

scheduleJobsWith ::
     MonadIO m
  => (((b -> m ()) -> WorkerId -> m ()) -> m (Job m a))
  -> Jobs m a
  -> (WorkerId -> m b)
  -> m ()
scheduleJobsWith :: forall (m :: * -> *) b a.
MonadIO m =>
(((b -> m ()) -> WorkerId -> m ()) -> m (Job m a))
-> Jobs m a -> (WorkerId -> m b) -> m ()
scheduleJobsWith ((b -> m ()) -> WorkerId -> m ()) -> m (Job m a)
mkJob' Jobs {Int
MVar SchedulerStatus
PVar Int RealWorld
JQueue m a
jobsNumWorkers :: Int
jobsQueue :: JQueue m a
jobsQueueCount :: PVar Int RealWorld
jobsSchedulerStatus :: MVar SchedulerStatus
jobsNumWorkers :: forall (m :: * -> *) a. Jobs m a -> Int
jobsQueue :: forall (m :: * -> *) a. Jobs m a -> JQueue m a
jobsQueueCount :: forall (m :: * -> *) a. Jobs m a -> PVar Int RealWorld
jobsSchedulerStatus :: forall (m :: * -> *) a. Jobs m a -> MVar SchedulerStatus
..} WorkerId -> m b
action = do
  Job m a
job <-
    ((b -> m ()) -> WorkerId -> m ()) -> m (Job m a)
mkJob' (((b -> m ()) -> WorkerId -> m ()) -> m (Job m a))
-> ((b -> m ()) -> WorkerId -> m ()) -> m (Job m a)
forall a b. (a -> b) -> a -> b
$ \b -> m ()
storeResult WorkerId
wid -> do
      b
res <- WorkerId -> m b
action WorkerId
wid
      b
res b -> m () -> m ()
forall a b. a -> b -> b
`seq` b -> m ()
storeResult b
res
  IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ PVar Int RealWorld -> Int -> IO Int
forall s (m :: * -> *). MonadPrim s m => PVar Int s -> Int -> m Int
atomicAddIntPVar PVar Int RealWorld
jobsQueueCount Int
1
  JQueue m a -> Job m a -> m ()
forall (m :: * -> *) a. MonadIO m => JQueue m a -> Job m a -> m ()
pushJQueue JQueue m a
jobsQueue Job m a
job
{-# INLINEABLE scheduleJobsWith #-}


-- | Runs the worker until it is terminated with a `WorkerTerminateException` or is killed
-- by some other asynchronous exception, which will propagate to the user calling thread.
runWorker ::
     MonadUnliftIO m
  => (forall b. m b -> IO b)
  -> (forall c. IO c -> IO c)
  -> WorkerId
  -> Jobs m a
  -> IO ()
runWorker :: forall (m :: * -> *) a.
MonadUnliftIO m =>
(forall b. m b -> IO b)
-> (forall a. IO a -> IO a) -> WorkerId -> Jobs m a -> IO ()
runWorker forall b. m b -> IO b
run forall a. IO a -> IO a
unmask WorkerId
wId Jobs {JQueue m a
jobsQueue :: forall (m :: * -> *) a. Jobs m a -> JQueue m a
jobsQueue :: JQueue m a
jobsQueue, PVar Int RealWorld
jobsQueueCount :: forall (m :: * -> *) a. Jobs m a -> PVar Int RealWorld
jobsQueueCount :: PVar Int RealWorld
jobsQueueCount, MVar SchedulerStatus
jobsSchedulerStatus :: forall (m :: * -> *) a. Jobs m a -> MVar SchedulerStatus
jobsSchedulerStatus :: MVar SchedulerStatus
jobsSchedulerStatus} = IO ()
go
  where
    onBlockedMVar :: Either SomeException () -> IO ()
onBlockedMVar Either SomeException ()
eUnblocked =
      case Either SomeException ()
eUnblocked of
        Right () -> IO ()
go
        Left SomeException
uExc
          | Just WorkerTerminateException
WorkerTerminateException <- SomeException -> Maybe WorkerTerminateException
forall e. Exception e => SomeException -> Maybe e
asyncExceptionFromException SomeException
uExc -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Left SomeException
uExc
          | Just CancelBatchException
CancelBatchException <- SomeException -> Maybe CancelBatchException
forall e. Exception e => SomeException -> Maybe e
asyncExceptionFromException SomeException
uExc -> IO ()
go
        Left SomeException
uExc -> SomeException -> IO ()
forall e a. Exception e => e -> IO a
throwIO SomeException
uExc
    go :: IO ()
go = do
      Either SomeException Int
eRes <- IO Int -> IO (Either SomeException Int)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO Int -> IO (Either SomeException Int))
-> IO Int -> IO (Either SomeException Int)
forall a b. (a -> b) -> a -> b
$ do
        WorkerId -> m ()
job <- m (WorkerId -> m ()) -> IO (WorkerId -> m ())
forall b. m b -> IO b
run (JQueue m a -> m (WorkerId -> m ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
JQueue m a -> m (WorkerId -> m ())
popJQueue JQueue m a
jobsQueue)
        IO Int -> IO Int
forall a. IO a -> IO a
unmask (m () -> IO ()
forall b. m b -> IO b
run (WorkerId -> m ()
job WorkerId
wId) IO () -> IO Int -> IO Int
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> PVar Int RealWorld -> Int -> IO Int
forall s (m :: * -> *). MonadPrim s m => PVar Int s -> Int -> m Int
atomicSubIntPVar PVar Int RealWorld
jobsQueueCount Int
1)
      -- \ popJQueue can block, but it is still interruptable
      case Either SomeException Int
eRes of
        Right Int
1 -> IO () -> IO (Either SomeException ())
forall e a. Exception e => IO a -> IO (Either e a)
try (MVar SchedulerStatus -> SchedulerStatus -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar SchedulerStatus
jobsSchedulerStatus SchedulerStatus
SchedulerIdle) IO (Either SomeException ())
-> (Either SomeException () -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Either SomeException () -> IO ()
onBlockedMVar
        Right Int
_ -> IO ()
go
        Left SomeException
exc
          | Just WorkerTerminateException
WorkerTerminateException <- SomeException -> Maybe WorkerTerminateException
forall e. Exception e => SomeException -> Maybe e
asyncExceptionFromException SomeException
exc -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Left SomeException
exc
          | Just CancelBatchException
CancelBatchException <- SomeException -> Maybe CancelBatchException
forall e. Exception e => SomeException -> Maybe e
asyncExceptionFromException SomeException
exc -> IO ()
go
        Left SomeException
exc -> do
          Either SomeException ()
eUnblocked <-
            IO () -> IO (Either SomeException ())
forall e a. Exception e => IO a -> IO (Either e a)
try (IO () -> IO (Either SomeException ()))
-> IO () -> IO (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ MVar SchedulerStatus -> SchedulerStatus -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar SchedulerStatus
jobsSchedulerStatus (WorkerException -> SchedulerStatus
SchedulerWorkerException (SomeException -> WorkerException
WorkerException SomeException
exc))
          -- \ without blocking with putMVar here we would not be able to report an
          -- exception in the main thread, especially if `exc` is asynchronous.
          Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (SomeException -> Bool
forall e. Exception e => e -> Bool
isSyncException SomeException
exc) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ SomeException -> IO ()
forall e a. Exception e => e -> IO a
throwIO SomeException
exc
          Either SomeException () -> IO ()
onBlockedMVar Either SomeException ()
eUnblocked
{-# INLINEABLE runWorker #-}


initScheduler ::
     Comp
  -> (Jobs IO a -> (WorkerId -> IO a) -> IO ())
  -> (JQueue IO a -> IO [a])
  -> IO (Jobs IO a, [ThreadId] -> Scheduler RealWorld a)
initScheduler :: forall a.
Comp
-> (Jobs IO a -> (WorkerId -> IO a) -> IO ())
-> (JQueue IO a -> IO [a])
-> IO (Jobs IO a, [ThreadId] -> Scheduler RealWorld a)
initScheduler Comp
comp Jobs IO a -> (WorkerId -> IO a) -> IO ()
submitWork JQueue IO a -> IO [a]
collect = do
  Int
jobsNumWorkers <- Comp -> IO Int
forall (m :: * -> *). MonadIO m => Comp -> m Int
getCompWorkers Comp
comp
  JQueue IO a
jobsQueue <- IO (JQueue IO a)
forall (m :: * -> *) a. MonadIO m => m (JQueue m a)
newJQueue
  PVar Int RealWorld
jobsQueueCount <- IO (PVar Int RealWorld) -> IO (PVar Int RealWorld)
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (PVar Int RealWorld) -> IO (PVar Int RealWorld))
-> IO (PVar Int RealWorld) -> IO (PVar Int RealWorld)
forall a b. (a -> b) -> a -> b
$ Int -> IO (PVar Int RealWorld)
forall s (m :: * -> *) a.
(MonadPrim s m, Prim a) =>
a -> m (PVar a s)
newPVar Int
1
  MVar SchedulerStatus
jobsSchedulerStatus <- IO (MVar SchedulerStatus) -> IO (MVar SchedulerStatus)
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar SchedulerStatus)
forall a. IO (MVar a)
newEmptyMVar
  IORef (Maybe (Results a))
earlyTerminationResultRef <- IO (IORef (Maybe (Results a))) -> IO (IORef (Maybe (Results a)))
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Maybe (Results a))) -> IO (IORef (Maybe (Results a))))
-> IO (IORef (Maybe (Results a))) -> IO (IORef (Maybe (Results a)))
forall a b. (a -> b) -> a -> b
$ Maybe (Results a) -> IO (IORef (Maybe (Results a)))
forall a. a -> IO (IORef a)
newIORef Maybe (Results a)
forall a. Maybe a
Nothing
  IORef BatchId
batchIdRef <- IO (IORef BatchId) -> IO (IORef BatchId)
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef BatchId) -> IO (IORef BatchId))
-> IO (IORef BatchId) -> IO (IORef BatchId)
forall a b. (a -> b) -> a -> b
$ BatchId -> IO (IORef BatchId)
forall a. a -> IO (IORef a)
newIORef (BatchId -> IO (IORef BatchId)) -> BatchId -> IO (IORef BatchId)
forall a b. (a -> b) -> a -> b
$ Int -> BatchId
BatchId Int
0
  IORef (Maybe (Early a))
batchEarlyRef <- IO (IORef (Maybe (Early a))) -> IO (IORef (Maybe (Early a)))
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Maybe (Early a))) -> IO (IORef (Maybe (Early a))))
-> IO (IORef (Maybe (Early a))) -> IO (IORef (Maybe (Early a)))
forall a b. (a -> b) -> a -> b
$ Maybe (Early a) -> IO (IORef (Maybe (Early a)))
forall a. a -> IO (IORef a)
newIORef Maybe (Early a)
forall a. Maybe a
Nothing
  let jobs :: Jobs IO a
jobs =
        Jobs
          { jobsNumWorkers :: Int
jobsNumWorkers = Int
jobsNumWorkers
          , jobsQueue :: JQueue IO a
jobsQueue = JQueue IO a
jobsQueue
          , jobsQueueCount :: PVar Int RealWorld
jobsQueueCount = PVar Int RealWorld
jobsQueueCount
          , jobsSchedulerStatus :: MVar SchedulerStatus
jobsSchedulerStatus = MVar SchedulerStatus
jobsSchedulerStatus
          }
      bumpCurrentBatchId :: IO ()
bumpCurrentBatchId = IORef Int -> (Int -> Int) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (IORef BatchId -> IORef Int
forall a b. Coercible a b => a -> b
coerce IORef BatchId
batchIdRef) (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ (Int
1 :: Int))
      bumpBatchId :: BatchId -> IO Bool
bumpBatchId (BatchId Int
c) =
        IORef BatchId -> (BatchId -> (BatchId, Bool)) -> IO Bool
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef BatchId
batchIdRef ((BatchId -> (BatchId, Bool)) -> IO Bool)
-> (BatchId -> (BatchId, Bool)) -> IO Bool
forall a b. (a -> b) -> a -> b
$ \b :: BatchId
b@(BatchId Int
x) ->
          if Int
x Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
c
            then (Int -> BatchId
BatchId (Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1), Bool
True)
            else (BatchId
b, Bool
False)
      mkScheduler :: [ThreadId] -> Scheduler RealWorld a
mkScheduler [ThreadId]
tids =
        Scheduler
          { _numWorkers :: Int
_numWorkers = Int
jobsNumWorkers
          , _scheduleWorkId :: (WorkerId -> ST RealWorld a) -> ST RealWorld ()
_scheduleWorkId = \WorkerId -> ST RealWorld a
f -> IO () -> ST RealWorld ()
forall (m :: * -> *) a.
(PrimMonad m, PrimState m ~ RealWorld) =>
IO a -> m a
ioToPrim (IO () -> ST RealWorld ()) -> IO () -> ST RealWorld ()
forall a b. (a -> b) -> a -> b
$ Jobs IO a -> (WorkerId -> IO a) -> IO ()
submitWork Jobs IO a
jobs (ST RealWorld a -> IO a
ST (PrimState IO) a -> IO a
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (ST RealWorld a -> IO a)
-> (WorkerId -> ST RealWorld a) -> WorkerId -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. WorkerId -> ST RealWorld a
f)
          , _terminate :: Early a -> ST RealWorld a
_terminate =
              \Early a
early -> IO a -> ST RealWorld a
forall (m :: * -> *) a.
(PrimMonad m, PrimState m ~ RealWorld) =>
IO a -> m a
ioToPrim (IO a -> ST RealWorld a) -> IO a -> ST RealWorld a
forall a b. (a -> b) -> a -> b
$ do
                Results a
finishEarly <-
                  case Early a
early of
                    Early a
r -> [a] -> a -> Results a
forall a. [a] -> a -> Results a
FinishedEarly ([a] -> a -> Results a) -> IO [a] -> IO (a -> Results a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> JQueue IO a -> IO [a]
collect JQueue IO a
jobsQueue IO (a -> Results a) -> IO a -> IO (Results a)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> a -> IO a
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
r
                    EarlyWith a
r -> Results a -> IO (Results a)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Results a -> IO (Results a)) -> Results a -> IO (Results a)
forall a b. (a -> b) -> a -> b
$ a -> Results a
forall a. a -> Results a
FinishedEarlyWith a
r
                IO a -> IO a
forall (m :: * -> *) a.
(PrimMonad m, PrimState m ~ RealWorld) =>
IO a -> m a
ioToPrim (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ do
                  IO ()
bumpCurrentBatchId
                  IORef (Maybe (Results a)) -> Maybe (Results a) -> IO ()
forall a. IORef a -> a -> IO ()
atomicWriteIORef IORef (Maybe (Results a))
earlyTerminationResultRef (Maybe (Results a) -> IO ()) -> Maybe (Results a) -> IO ()
forall a b. (a -> b) -> a -> b
$ Results a -> Maybe (Results a)
forall a. a -> Maybe a
Just Results a
finishEarly
                  TerminateEarlyException -> IO a
forall e a. Exception e => e -> IO a
throwIO TerminateEarlyException
TerminateEarlyException
          , _waitForCurrentBatch :: ST RealWorld (Results a)
_waitForCurrentBatch = IO (Results a) -> ST RealWorld (Results a)
forall (m :: * -> *) a.
(PrimMonad m, PrimState m ~ RealWorld) =>
IO a -> m a
ioToPrim (IO (Results a) -> ST RealWorld (Results a))
-> IO (Results a) -> ST RealWorld (Results a)
forall a b. (a -> b) -> a -> b
$
              do Jobs IO a -> (WorkerId -> IO ()) -> IO ()
forall (m :: * -> *) a b.
MonadIO m =>
Jobs m a -> (WorkerId -> m b) -> m ()
scheduleJobs_ Jobs IO a
jobs (\WorkerId
_ -> IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ PVar Int RealWorld -> Int -> IO Int
forall s (m :: * -> *). MonadPrim s m => PVar Int s -> Int -> m Int
atomicSubIntPVar PVar Int RealWorld
jobsQueueCount Int
1)
                 JQueue IO a -> IO ()
forall (m :: * -> *) a. MonadIO m => JQueue m a -> m ()
unblockPopJQueue JQueue IO a
jobsQueue
                 SchedulerStatus
status <- IO SchedulerStatus -> IO SchedulerStatus
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO SchedulerStatus -> IO SchedulerStatus)
-> IO SchedulerStatus -> IO SchedulerStatus
forall a b. (a -> b) -> a -> b
$ MVar SchedulerStatus -> IO SchedulerStatus
forall a. MVar a -> IO a
takeMVar MVar SchedulerStatus
jobsSchedulerStatus
                 Maybe (Early a)
mEarly <- IO (Maybe (Early a)) -> IO (Maybe (Early a))
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (Early a)) -> IO (Maybe (Early a)))
-> IO (Maybe (Early a)) -> IO (Maybe (Early a))
forall a b. (a -> b) -> a -> b
$ IORef (Maybe (Early a))
-> (Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
-> IO (Maybe (Early a))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef (Maybe (Early a))
batchEarlyRef ((Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
 -> IO (Maybe (Early a)))
-> (Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
-> IO (Maybe (Early a))
forall a b. (a -> b) -> a -> b
$ \Maybe (Early a)
mEarly -> (Maybe (Early a)
forall a. Maybe a
Nothing, Maybe (Early a)
mEarly)
                 Results a
rs <-
                   case SchedulerStatus
status of
                     SchedulerWorkerException (WorkerException SomeException
exc) ->
                       case SomeException -> Maybe CancelBatchException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
exc of
                         Just CancelBatchException
CancelBatchException -> do
                           ()
_ <- JQueue IO a -> IO ()
forall (m :: * -> *) a. MonadIO m => JQueue m a -> m ()
clearPendingJQueue JQueue IO a
jobsQueue
                           IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
                             (ThreadId -> IO ()) -> [ThreadId] -> IO ()
forall (f :: * -> *) (t :: * -> *) a.
(Applicative f, Foldable t) =>
(a -> f ()) -> t a -> f ()
traverse_ (ThreadId -> SomeAsyncException -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
`throwTo` CancelBatchException -> SomeAsyncException
forall e. Exception e => e -> SomeAsyncException
SomeAsyncException CancelBatchException
CancelBatchException) [ThreadId]
tids
                           Maybe (Early a) -> IO [a] -> IO (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly (IO [a] -> IO (Results a))
-> ([a] -> IO [a]) -> [a] -> IO (Results a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [a] -> IO [a]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([a] -> IO (Results a)) -> IO [a] -> IO (Results a)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< JQueue IO a -> IO [a]
collect JQueue IO a
jobsQueue
                         Maybe CancelBatchException
Nothing -> IO (Results a) -> IO (Results a)
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Results a) -> IO (Results a))
-> IO (Results a) -> IO (Results a)
forall a b. (a -> b) -> a -> b
$ SomeException -> IO (Results a)
forall e a. Exception e => e -> IO a
throwIO SomeException
exc
                     SchedulerStatus
SchedulerIdle -> do
                       JQueue IO a -> IO ()
forall (m :: * -> *) a. MonadIO m => JQueue m a -> m ()
blockPopJQueue JQueue IO a
jobsQueue
                       IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ()
bumpCurrentBatchId
                       [a]
res <- JQueue IO a -> IO [a]
collect JQueue IO a
jobsQueue
                       [a]
res [a] -> IO (Results a) -> IO (Results a)
forall a b. a -> b -> b
`seq` Maybe (Early a) -> IO [a] -> IO (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly ([a] -> IO [a]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [a]
res)
                 Results a
rs Results a -> IO () -> IO (Results a)
forall a b. a -> IO b -> IO a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (PVar Int RealWorld -> Int -> IO ()
forall s (m :: * -> *). MonadPrim s m => PVar Int s -> Int -> m ()
atomicWriteIntPVar PVar Int RealWorld
jobsQueueCount Int
1)
          , _earlyResults :: ST RealWorld (Maybe (Results a))
_earlyResults = IO (Maybe (Results a)) -> ST RealWorld (Maybe (Results a))
forall (m :: * -> *) a.
(PrimMonad m, PrimState m ~ RealWorld) =>
IO a -> m a
ioToPrim (IORef (Maybe (Results a)) -> IO (Maybe (Results a))
forall a. IORef a -> IO a
readIORef IORef (Maybe (Results a))
earlyTerminationResultRef)
          , _currentBatchId :: ST RealWorld BatchId
_currentBatchId = IO BatchId -> ST RealWorld BatchId
forall (m :: * -> *) a.
(PrimMonad m, PrimState m ~ RealWorld) =>
IO a -> m a
ioToPrim (IORef BatchId -> IO BatchId
forall a. IORef a -> IO a
readIORef IORef BatchId
batchIdRef)
          , _batchEarly :: ST RealWorld (Maybe (Early a))
_batchEarly = IO (Maybe (Early a)) -> ST RealWorld (Maybe (Early a))
forall (m :: * -> *) a.
(PrimMonad m, PrimState m ~ RealWorld) =>
IO a -> m a
ioToPrim (IORef (Maybe (Early a)) -> IO (Maybe (Early a))
forall a. IORef a -> IO a
readIORef IORef (Maybe (Early a))
batchEarlyRef)
          , _cancelBatch :: BatchId -> Early a -> ST RealWorld Bool
_cancelBatch =
              \BatchId
batchId Early a
early -> IO Bool -> ST RealWorld Bool
forall (m :: * -> *) a.
(PrimMonad m, PrimState m ~ RealWorld) =>
IO a -> m a
ioToPrim (IO Bool -> ST RealWorld Bool) -> IO Bool -> ST RealWorld Bool
forall a b. (a -> b) -> a -> b
$ do
                Bool
b <- IO Bool -> IO Bool
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> IO Bool) -> IO Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ BatchId -> IO Bool
bumpBatchId BatchId
batchId
                Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
b (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                  JQueue IO a -> IO ()
forall (m :: * -> *) a. MonadIO m => JQueue m a -> m ()
blockPopJQueue JQueue IO a
jobsQueue
                  IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                    IORef (Maybe (Early a)) -> Maybe (Early a) -> IO ()
forall a. IORef a -> a -> IO ()
atomicWriteIORef IORef (Maybe (Early a))
batchEarlyRef (Maybe (Early a) -> IO ()) -> Maybe (Early a) -> IO ()
forall a b. (a -> b) -> a -> b
$ Early a -> Maybe (Early a)
forall a. a -> Maybe a
Just Early a
early
                    CancelBatchException -> IO ()
forall e a. Exception e => e -> IO a
throwIO CancelBatchException
CancelBatchException
                Bool -> IO Bool
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
b
          }
  (Jobs IO a, [ThreadId] -> Scheduler RealWorld a)
-> IO (Jobs IO a, [ThreadId] -> Scheduler RealWorld a)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Jobs IO a
jobs, [ThreadId] -> Scheduler RealWorld a
mkScheduler)
{-# INLINEABLE initScheduler #-}

withSchedulerInternal ::
     Comp -- ^ Computation strategy
  -> (Jobs IO a -> (WorkerId -> IO a) -> IO ()) -- ^ How to schedule work
  -> (JQueue IO a -> IO [a]) -- ^ How to collect results
  -> (Scheduler RealWorld a -> IO b)
     -- ^ Action that will be scheduling all the work.
  -> IO (Results a)
withSchedulerInternal :: forall a b.
Comp
-> (Jobs IO a -> (WorkerId -> IO a) -> IO ())
-> (JQueue IO a -> IO [a])
-> (Scheduler RealWorld a -> IO b)
-> IO (Results a)
withSchedulerInternal Comp
comp Jobs IO a -> (WorkerId -> IO a) -> IO ()
submitWork JQueue IO a -> IO [a]
collect Scheduler RealWorld a -> IO b
onScheduler = do
  (jobs :: Jobs IO a
jobs@Jobs {Int
MVar SchedulerStatus
PVar Int RealWorld
JQueue IO a
jobsNumWorkers :: forall (m :: * -> *) a. Jobs m a -> Int
jobsQueue :: forall (m :: * -> *) a. Jobs m a -> JQueue m a
jobsQueueCount :: forall (m :: * -> *) a. Jobs m a -> PVar Int RealWorld
jobsSchedulerStatus :: forall (m :: * -> *) a. Jobs m a -> MVar SchedulerStatus
jobsNumWorkers :: Int
jobsQueue :: JQueue IO a
jobsQueueCount :: PVar Int RealWorld
jobsSchedulerStatus :: MVar SchedulerStatus
..}, [ThreadId] -> Scheduler RealWorld a
mkScheduler) <- Comp
-> (Jobs IO a -> (WorkerId -> IO a) -> IO ())
-> (JQueue IO a -> IO [a])
-> IO (Jobs IO a, [ThreadId] -> Scheduler RealWorld a)
forall a.
Comp
-> (Jobs IO a -> (WorkerId -> IO a) -> IO ())
-> (JQueue IO a -> IO [a])
-> IO (Jobs IO a, [ThreadId] -> Scheduler RealWorld a)
initScheduler Comp
comp Jobs IO a -> (WorkerId -> IO a) -> IO ()
submitWork JQueue IO a -> IO [a]
collect
  -- / Wait for the initial jobs to get scheduled before spawining off the workers, otherwise it
  -- would be trickier to identify the beginning and the end of a job pool.
  ((forall a. IO a -> IO a) -> IO (Results a)) -> IO (Results a)
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. IO a -> IO a) -> IO (Results a)) -> IO (Results a))
-> ((forall a. IO a -> IO a) -> IO (Results a)) -> IO (Results a)
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
run -> do
    IO [ThreadId]
-> ([ThreadId] -> IO ())
-> ([ThreadId] -> IO (Results a))
-> IO (Results a)
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (IO [ThreadId] -> IO [ThreadId]
forall a. IO a -> IO a
run (Jobs IO a -> Comp -> IO [ThreadId]
forall (m :: * -> *) a.
MonadUnliftIO m =>
Jobs m a -> Comp -> m [ThreadId]
spawnWorkers Jobs IO a
jobs Comp
comp)) [ThreadId] -> IO ()
terminateWorkers (([ThreadId] -> IO (Results a)) -> IO (Results a))
-> ([ThreadId] -> IO (Results a)) -> IO (Results a)
forall a b. (a -> b) -> a -> b
$ \[ThreadId]
tids ->
      let scheduler :: Scheduler RealWorld a
scheduler = [ThreadId] -> Scheduler RealWorld a
mkScheduler [ThreadId]
tids
          readEarlyTermination :: IO (Results a)
readEarlyTermination =
            ST (PrimState IO) (Maybe (Results a)) -> IO (Maybe (Results a))
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (Scheduler RealWorld a -> ST RealWorld (Maybe (Results a))
forall s a. Scheduler s a -> ST s (Maybe (Results a))
_earlyResults Scheduler RealWorld a
scheduler) IO (Maybe (Results a))
-> (Maybe (Results a) -> IO (Results a)) -> IO (Results a)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
              Maybe (Results a)
Nothing -> [Char] -> IO (Results a)
forall a. HasCallStack => [Char] -> a
error [Char]
"Impossible: uninitialized early termination value"
              Just Results a
rs -> Results a -> IO (Results a)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Results a
rs
       in IO b -> IO (Either TerminateEarlyException b)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO b -> IO b
forall a. IO a -> IO a
run (Scheduler RealWorld a -> IO b
onScheduler Scheduler RealWorld a
scheduler)) IO (Either TerminateEarlyException b)
-> (Either TerminateEarlyException b -> IO (Results a))
-> IO (Results a)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
            Left TerminateEarlyException
TerminateEarlyException -> IO (Results a) -> IO (Results a)
forall a. IO a -> IO a
run IO (Results a)
readEarlyTermination
            Right b
_ -> do
              IO () -> IO ()
forall a. IO a -> IO a
run (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Jobs IO a -> (WorkerId -> IO ()) -> IO ()
forall (m :: * -> *) a b.
MonadIO m =>
Jobs m a -> (WorkerId -> m b) -> m ()
scheduleJobs_ Jobs IO a
jobs (\WorkerId
_ -> IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ PVar Int RealWorld -> Int -> IO Int
forall s (m :: * -> *). MonadPrim s m => PVar Int s -> Int -> m Int
atomicSubIntPVar PVar Int RealWorld
jobsQueueCount Int
1)
              IO () -> IO ()
forall a. IO a -> IO a
run (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ JQueue IO a -> IO ()
forall (m :: * -> *) a. MonadIO m => JQueue m a -> m ()
unblockPopJQueue JQueue IO a
jobsQueue
              SchedulerStatus
status <- MVar SchedulerStatus -> IO SchedulerStatus
forall a. MVar a -> IO a
takeMVar MVar SchedulerStatus
jobsSchedulerStatus
                -- \ wait for all worker to finish. If any one of the workers had a problem, then
                -- this MVar will contain an exception
              case SchedulerStatus
status of
                SchedulerWorkerException (WorkerException SomeException
exc)
                  | Just TerminateEarlyException
TerminateEarlyException <- SomeException -> Maybe TerminateEarlyException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
exc -> IO (Results a) -> IO (Results a)
forall a. IO a -> IO a
run IO (Results a)
readEarlyTermination
                  | Just CancelBatchException
CancelBatchException <- SomeException -> Maybe CancelBatchException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
exc ->
                    IO (Results a) -> IO (Results a)
forall a. IO a -> IO a
run (IO (Results a) -> IO (Results a))
-> IO (Results a) -> IO (Results a)
forall a b. (a -> b) -> a -> b
$ do
                      Maybe (Early a)
mEarly <- ST (PrimState IO) (Maybe (Early a)) -> IO (Maybe (Early a))
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (ST (PrimState IO) (Maybe (Early a)) -> IO (Maybe (Early a)))
-> ST (PrimState IO) (Maybe (Early a)) -> IO (Maybe (Early a))
forall a b. (a -> b) -> a -> b
$ Scheduler RealWorld a -> ST RealWorld (Maybe (Early a))
forall s a. Scheduler s a -> ST s (Maybe (Early a))
_batchEarly Scheduler RealWorld a
scheduler
                      Maybe (Early a) -> IO [a] -> IO (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly (JQueue IO a -> IO [a]
collect JQueue IO a
jobsQueue)
                  | Bool
otherwise -> SomeException -> IO (Results a)
forall e a. Exception e => e -> IO a
throwIO SomeException
exc
                  -- \ Here we need to unwrap the legit worker exception and rethrow it, so
                  -- the main thread will think like it's his own
                SchedulerStatus
SchedulerIdle ->
                  IO (Results a) -> IO (Results a)
forall a. IO a -> IO a
run (IO (Results a) -> IO (Results a))
-> IO (Results a) -> IO (Results a)
forall a b. (a -> b) -> a -> b
$ do
                    Maybe (Early a)
mEarly <- ST (PrimState IO) (Maybe (Early a)) -> IO (Maybe (Early a))
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (ST (PrimState IO) (Maybe (Early a)) -> IO (Maybe (Early a)))
-> ST (PrimState IO) (Maybe (Early a)) -> IO (Maybe (Early a))
forall a b. (a -> b) -> a -> b
$ Scheduler RealWorld a -> ST RealWorld (Maybe (Early a))
forall s a. Scheduler s a -> ST s (Maybe (Early a))
_batchEarly Scheduler RealWorld a
scheduler
                    Maybe (Early a) -> IO [a] -> IO (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly (JQueue IO a -> IO [a]
collect JQueue IO a
jobsQueue)
                  -- \ Now we are sure all workers have done their job we can safely read
                  -- all of the IORefs with results
{-# INLINEABLE withSchedulerInternal #-}


collectResults :: Applicative f => Maybe (Early a) -> f [a] -> f (Results a)
collectResults :: forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly f [a]
collect =
  case Maybe (Early a)
mEarly of
    Maybe (Early a)
Nothing -> [a] -> Results a
forall a. [a] -> Results a
Finished ([a] -> Results a) -> f [a] -> f (Results a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> f [a]
collect
    Just (Early a
r) -> [a] -> a -> Results a
forall a. [a] -> a -> Results a
FinishedEarly ([a] -> a -> Results a) -> f [a] -> f (a -> Results a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> f [a]
collect f (a -> Results a) -> f a -> f (Results a)
forall a b. f (a -> b) -> f a -> f b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> a -> f a
forall a. a -> f a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
r
    Just (EarlyWith a
r) -> Results a -> f (Results a)
forall a. a -> f a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Results a -> f (Results a)) -> Results a -> f (Results a)
forall a b. (a -> b) -> a -> b
$ a -> Results a
forall a. a -> Results a
FinishedEarlyWith a
r
{-# INLINEABLE collectResults #-}


spawnWorkers :: forall m a. MonadUnliftIO m => Jobs m a -> Comp -> m [ThreadId]
spawnWorkers :: forall (m :: * -> *) a.
MonadUnliftIO m =>
Jobs m a -> Comp -> m [ThreadId]
spawnWorkers jobs :: Jobs m a
jobs@Jobs {Int
jobsNumWorkers :: forall (m :: * -> *) a. Jobs m a -> Int
jobsNumWorkers :: Int
jobsNumWorkers} =
  \case
    Comp
Par -> MonadUnliftIO m =>
(Int -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
(Int -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
spawnWorkersWith Int -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkOnWithUnmask [Int
1 .. Int
jobsNumWorkers]
    ParOn [Int]
ws -> MonadUnliftIO m =>
(Int -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
(Int -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
spawnWorkersWith Int -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkOnWithUnmask [Int]
ws
    ParN Word16
_ -> MonadUnliftIO m =>
(Int -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
(Int -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
spawnWorkersWith (\Int
_ -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkIOWithUnmask) [Int
1 .. Int
jobsNumWorkers]
    Comp
Seq -> MonadUnliftIO m =>
(Int -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
(Int -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
spawnWorkersWith (\Int
_ -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkIOWithUnmask) [Int
1 :: Int]
    -- \ sequential computation is suboptimal when used in this way.
  where
    spawnWorkersWith ::
         MonadUnliftIO m
      => (Int -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId)
      -> [Int]
      -> m [ThreadId]
    spawnWorkersWith :: MonadUnliftIO m =>
(Int -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
spawnWorkersWith Int -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
fork [Int]
ws =
      ((forall a. m a -> IO a) -> IO [ThreadId]) -> m [ThreadId]
forall b. ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO [ThreadId]) -> m [ThreadId])
-> ((forall a. m a -> IO a) -> IO [ThreadId]) -> m [ThreadId]
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run ->
        [(WorkerId, Int)]
-> ((WorkerId, Int) -> IO ThreadId) -> IO [ThreadId]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM ([WorkerId] -> [Int] -> [(WorkerId, Int)]
forall a b. [a] -> [b] -> [(a, b)]
zip [WorkerId
0 ..] [Int]
ws) (((WorkerId, Int) -> IO ThreadId) -> IO [ThreadId])
-> ((WorkerId, Int) -> IO ThreadId) -> IO [ThreadId]
forall a b. (a -> b) -> a -> b
$ \(WorkerId
wId, Int
on) ->
          Int -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
fork Int
on (((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask -> (forall a. m a -> IO a)
-> (forall a. IO a -> IO a) -> WorkerId -> Jobs m a -> IO ()
forall (m :: * -> *) a.
MonadUnliftIO m =>
(forall b. m b -> IO b)
-> (forall a. IO a -> IO a) -> WorkerId -> Jobs m a -> IO ()
runWorker m b -> IO b
forall a. m a -> IO a
run IO c -> IO c
forall a. IO a -> IO a
unmask WorkerId
wId Jobs m a
jobs
{-# INLINEABLE spawnWorkers #-}

terminateWorkers :: [ThreadId] -> IO ()
terminateWorkers :: [ThreadId] -> IO ()
terminateWorkers = (ThreadId -> IO ()) -> [ThreadId] -> IO ()
forall (f :: * -> *) (t :: * -> *) a.
(Applicative f, Foldable t) =>
(a -> f ()) -> t a -> f ()
traverse_ (ThreadId -> SomeAsyncException -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
`throwTo` WorkerTerminateException -> SomeAsyncException
forall e. Exception e => e -> SomeAsyncException
SomeAsyncException WorkerTerminateException
WorkerTerminateException)

-- | Conversion to a list. Elements are expected to be in the orignal LIFO order, so
-- calling `reverse` is still necessary for getting the results in FIFO order.
resultsToList :: Results a -> [a]
resultsToList :: forall a. Results a -> [a]
resultsToList = \case
  Finished [a]
rs -> [a]
rs
  FinishedEarly [a]
rs a
r -> a
ra -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
rs
  FinishedEarlyWith a
r -> [a
r]
{-# INLINEABLE resultsToList #-}


reverseResults :: Results a -> Results a
reverseResults :: forall a. Results a -> Results a
reverseResults = \case
  Finished [a]
rs -> [a] -> Results a
forall a. [a] -> Results a
Finished ([a] -> [a]
forall a. [a] -> [a]
reverse [a]
rs)
  FinishedEarly [a]
rs a
r -> [a] -> a -> Results a
forall a. [a] -> a -> Results a
FinishedEarly ([a] -> [a]
forall a. [a] -> [a]
reverse [a]
rs) a
r
  Results a
res -> Results a
res
{-# INLINEABLE reverseResults #-}



-- Copies from unliftio

isSyncException :: Exception e => e -> Bool
isSyncException :: forall e. Exception e => e -> Bool
isSyncException e
exc =
  case SomeException -> Maybe SomeAsyncException
forall e. Exception e => SomeException -> Maybe e
fromException (e -> SomeException
forall e. Exception e => e -> SomeException
toException e
exc) of
    Just (SomeAsyncException e
_) -> Bool
False
    Maybe SomeAsyncException
Nothing -> Bool
True

safeBracketOnError :: MonadUnliftIO m => m a -> (a -> m b) -> (a -> m c) -> m c
safeBracketOnError :: forall (m :: * -> *) a b c.
MonadUnliftIO m =>
m a -> (a -> m b) -> (a -> m c) -> m c
safeBracketOnError m a
before a -> m b
after a -> m c
thing = ((forall a. m a -> IO a) -> IO c) -> m c
forall b. ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO c) -> m c)
-> ((forall a. m a -> IO a) -> IO c) -> m c
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> ((forall a. IO a -> IO a) -> IO c) -> IO c
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO c) -> IO c)
-> ((forall a. IO a -> IO a) -> IO c) -> IO c
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
  a
x <- m a -> IO a
forall a. m a -> IO a
run m a
before
  Either SomeException c
res1 <- IO c -> IO (Either SomeException c)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO c -> IO (Either SomeException c))
-> IO c -> IO (Either SomeException c)
forall a b. (a -> b) -> a -> b
$ IO c -> IO c
forall a. IO a -> IO a
restore (IO c -> IO c) -> IO c -> IO c
forall a b. (a -> b) -> a -> b
$ m c -> IO c
forall a. m a -> IO a
run (m c -> IO c) -> m c -> IO c
forall a b. (a -> b) -> a -> b
$ a -> m c
thing a
x
  case Either SomeException c
res1 of
    Left (SomeException
e1 :: SomeException) -> do
      Either SomeException b
_ :: Either SomeException b <-
        IO b -> IO (Either SomeException b)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO b -> IO (Either SomeException b))
-> IO b -> IO (Either SomeException b)
forall a b. (a -> b) -> a -> b
$ IO b -> IO b
forall a. IO a -> IO a
uninterruptibleMask_ (IO b -> IO b) -> IO b -> IO b
forall a b. (a -> b) -> a -> b
$ m b -> IO b
forall a. m a -> IO a
run (m b -> IO b) -> m b -> IO b
forall a b. (a -> b) -> a -> b
$ a -> m b
after a
x
      SomeException -> IO c
forall e a. Exception e => e -> IO a
throwIO SomeException
e1
    Right c
y -> c -> IO c
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return c
y