{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MonoLocalBinds #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# OPTIONS_HADDOCK hide, not-home #-}
module Control.Scheduler.Internal
( withSchedulerInternal
, initWorkerStates
, withSchedulerWSInternal
, trivialScheduler_
, withTrivialSchedulerR
, withTrivialSchedulerRIO
, initScheduler
, spawnWorkers
, terminateWorkers
, scheduleJobs
, scheduleJobs_
, scheduleJobsWith
, reverseResults
, resultsToList
, traverse_
, safeBracketOnError
) where
import Data.Coerce
import Control.Concurrent
import Control.Exception
import Control.Monad
import Control.Monad.IO.Unlift
import Control.Monad.Primitive
import Control.Scheduler.Computation
import Control.Scheduler.Types
import Control.Scheduler.Queue
import Data.Atomics (atomicModifyIORefCAS, atomicModifyIORefCAS_)
import qualified Data.Foldable as F (foldl')
import Data.IORef
import Data.Primitive.SmallArray
import Data.Primitive.MutVar
import Data.Primitive.PVar
initWorkerStates :: MonadIO m => Comp -> (WorkerId -> m ws) -> m (WorkerStates ws)
initWorkerStates :: forall (m :: * -> *) ws.
MonadIO m =>
Comp -> (WorkerId -> m ws) -> m (WorkerStates ws)
initWorkerStates Comp
comp WorkerId -> m ws
initState = do
Int
nWorkers <- Comp -> m Int
forall (m :: * -> *). MonadIO m => Comp -> m Int
getCompWorkers Comp
comp
SmallMutableArray RealWorld ws
arr <- IO (SmallMutableArray RealWorld ws)
-> m (SmallMutableArray RealWorld ws)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (SmallMutableArray RealWorld ws)
-> m (SmallMutableArray RealWorld ws))
-> IO (SmallMutableArray RealWorld ws)
-> m (SmallMutableArray RealWorld ws)
forall a b. (a -> b) -> a -> b
$ Int -> ws -> IO (SmallMutableArray (PrimState IO) ws)
forall (m :: * -> *) a.
PrimMonad m =>
Int -> a -> m (SmallMutableArray (PrimState m) a)
newSmallArray Int
nWorkers ([Char] -> ws
forall a. HasCallStack => [Char] -> a
error [Char]
"Uninitialized")
let go :: Int -> m ()
go Int
i =
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
nWorkers) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
ws
state <- WorkerId -> m ws
initState (Int -> WorkerId
WorkerId Int
i)
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SmallMutableArray (PrimState IO) ws -> Int -> ws -> IO ()
forall (m :: * -> *) a.
PrimMonad m =>
SmallMutableArray (PrimState m) a -> Int -> a -> m ()
writeSmallArray SmallMutableArray RealWorld ws
SmallMutableArray (PrimState IO) ws
arr Int
i ws
state
Int -> m ()
go (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
Int -> m ()
go Int
0
SmallArray ws
workerStates <- IO (SmallArray ws) -> m (SmallArray ws)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (SmallArray ws) -> m (SmallArray ws))
-> IO (SmallArray ws) -> m (SmallArray ws)
forall a b. (a -> b) -> a -> b
$ SmallMutableArray (PrimState IO) ws -> IO (SmallArray ws)
forall (m :: * -> *) a.
PrimMonad m =>
SmallMutableArray (PrimState m) a -> m (SmallArray a)
unsafeFreezeSmallArray SmallMutableArray RealWorld ws
SmallMutableArray (PrimState IO) ws
arr
PVar Int RealWorld
mutex <- IO (PVar Int RealWorld) -> m (PVar Int RealWorld)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (PVar Int RealWorld) -> m (PVar Int RealWorld))
-> IO (PVar Int RealWorld) -> m (PVar Int RealWorld)
forall a b. (a -> b) -> a -> b
$ Int -> IO (PVar Int RealWorld)
forall s (m :: * -> *) a.
(MonadPrim s m, Prim a) =>
a -> m (PVar a s)
newPVar Int
0
WorkerStates ws -> m (WorkerStates ws)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
WorkerStates
{_workerStatesComp :: Comp
_workerStatesComp = Comp
comp, _workerStatesArray :: SmallArray ws
_workerStatesArray = SmallArray ws
workerStates, _workerStatesMutex :: PVar Int RealWorld
_workerStatesMutex = PVar Int RealWorld
mutex}
withSchedulerWSInternal ::
MonadUnliftIO m
=> (Comp -> (Scheduler RealWorld a -> t) -> m b)
-> WorkerStates ws
-> (SchedulerWS ws a -> t)
-> m b
withSchedulerWSInternal :: forall (m :: * -> *) a t b ws.
MonadUnliftIO m =>
(Comp -> (Scheduler RealWorld a -> t) -> m b)
-> WorkerStates ws -> (SchedulerWS ws a -> t) -> m b
withSchedulerWSInternal Comp -> (Scheduler RealWorld a -> t) -> m b
withScheduler' WorkerStates ws
states SchedulerWS ws a -> t
action =
((forall a. m a -> IO a) -> IO b) -> m b
forall b. ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO b) -> m b)
-> ((forall a. m a -> IO a) -> IO b) -> m b
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> IO Int -> (Int -> IO ()) -> (Int -> IO b) -> IO b
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO Int
lockState Int -> IO ()
unlockState (m b -> IO b
forall a. m a -> IO a
run (m b -> IO b) -> (Int -> m b) -> Int -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> m b
runSchedulerWS)
where
mutex :: PVar Int RealWorld
mutex = WorkerStates ws -> PVar Int RealWorld
forall ws. WorkerStates ws -> PVar Int RealWorld
_workerStatesMutex WorkerStates ws
states
lockState :: IO Int
lockState = PVar Int RealWorld -> Int -> IO Int
forall s (m :: * -> *). MonadPrim s m => PVar Int s -> Int -> m Int
atomicOrIntPVar PVar Int RealWorld
mutex Int
1
unlockState :: Int -> IO ()
unlockState Int
wasLocked
| Int
wasLocked Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
1 = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
| Bool
otherwise = IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO Int
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> IO Int) -> IO Int -> IO Int
forall a b. (a -> b) -> a -> b
$ PVar Int RealWorld -> Int -> IO Int
forall s (m :: * -> *). MonadPrim s m => PVar Int s -> Int -> m Int
atomicAndIntPVar PVar Int RealWorld
mutex Int
0
runSchedulerWS :: Int -> m b
runSchedulerWS Int
isLocked
| Int
isLocked Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
1 = IO b -> m b
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO b -> m b) -> IO b -> m b
forall a b. (a -> b) -> a -> b
$ MutexException -> IO b
forall e a. Exception e => e -> IO a
throwIO MutexException
MutexException
| Bool
otherwise =
Comp -> (Scheduler RealWorld a -> t) -> m b
withScheduler' (WorkerStates ws -> Comp
forall ws. WorkerStates ws -> Comp
_workerStatesComp WorkerStates ws
states) ((Scheduler RealWorld a -> t) -> m b)
-> (Scheduler RealWorld a -> t) -> m b
forall a b. (a -> b) -> a -> b
$ \Scheduler RealWorld a
scheduler ->
SchedulerWS ws a -> t
action (WorkerStates ws -> Scheduler RealWorld a -> SchedulerWS ws a
forall ws a.
WorkerStates ws -> Scheduler RealWorld a -> SchedulerWS ws a
SchedulerWS WorkerStates ws
states Scheduler RealWorld a
scheduler)
trivialScheduler_ :: Scheduler s ()
trivialScheduler_ :: forall s. Scheduler s ()
trivialScheduler_ =
Scheduler
{ _numWorkers :: Int
_numWorkers = Int
1
, _scheduleWorkId :: (WorkerId -> ST s ()) -> ST s ()
_scheduleWorkId = \WorkerId -> ST s ()
f -> WorkerId -> ST s ()
f (Int -> WorkerId
WorkerId Int
0)
, _terminate :: Early () -> ST s ()
_terminate = ST s () -> Early () -> ST s ()
forall a b. a -> b -> a
const (ST s () -> Early () -> ST s ()) -> ST s () -> Early () -> ST s ()
forall a b. (a -> b) -> a -> b
$ () -> ST s ()
forall a. a -> ST s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
, _waitForCurrentBatch :: ST s (Results ())
_waitForCurrentBatch = Results () -> ST s (Results ())
forall a. a -> ST s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Results () -> ST s (Results ()))
-> Results () -> ST s (Results ())
forall a b. (a -> b) -> a -> b
$ [()] -> Results ()
forall a. [a] -> Results a
Finished []
, _earlyResults :: ST s (Maybe (Results ()))
_earlyResults = Maybe (Results ()) -> ST s (Maybe (Results ()))
forall a. a -> ST s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Results ())
forall a. Maybe a
Nothing
, _currentBatchId :: ST s BatchId
_currentBatchId = BatchId -> ST s BatchId
forall a. a -> ST s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (BatchId -> ST s BatchId) -> BatchId -> ST s BatchId
forall a b. (a -> b) -> a -> b
$ Int -> BatchId
BatchId Int
0
, _cancelBatch :: BatchId -> Early () -> ST s Bool
_cancelBatch = \BatchId
_ Early ()
_ -> Bool -> ST s Bool
forall a. a -> ST s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
, _batchEarly :: ST s (Maybe (Early ()))
_batchEarly = Maybe (Early ()) -> ST s (Maybe (Early ()))
forall a. a -> ST s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Early ())
forall a. Maybe a
Nothing
}
withTrivialSchedulerR :: forall a b m s. MonadPrim s m => (Scheduler s a -> m b) -> m (Results a)
withTrivialSchedulerR :: forall a b (m :: * -> *) s.
MonadPrim s m =>
(Scheduler s a -> m b) -> m (Results a)
withTrivialSchedulerR Scheduler s a -> m b
action = do
MutVar s [a]
resVar <- [a] -> m (MutVar (PrimState m) [a])
forall (m :: * -> *) a.
PrimMonad m =>
a -> m (MutVar (PrimState m) a)
newMutVar []
MutVar s BatchId
batchVar <- BatchId -> m (MutVar (PrimState m) BatchId)
forall (m :: * -> *) a.
PrimMonad m =>
a -> m (MutVar (PrimState m) a)
newMutVar (BatchId -> m (MutVar (PrimState m) BatchId))
-> BatchId -> m (MutVar (PrimState m) BatchId)
forall a b. (a -> b) -> a -> b
$ Int -> BatchId
BatchId Int
0
MutVar s (Maybe (Results a))
finResVar <- Maybe (Results a) -> m (MutVar (PrimState m) (Maybe (Results a)))
forall (m :: * -> *) a.
PrimMonad m =>
a -> m (MutVar (PrimState m) a)
newMutVar Maybe (Results a)
forall a. Maybe a
Nothing
MutVar s (Maybe (Early a))
batchEarlyVar <- Maybe (Early a) -> m (MutVar (PrimState m) (Maybe (Early a)))
forall (m :: * -> *) a.
PrimMonad m =>
a -> m (MutVar (PrimState m) a)
newMutVar Maybe (Early a)
forall a. Maybe a
Nothing
let bumpCurrentBatchId :: MonadPrim s m' => m' ()
bumpCurrentBatchId :: forall (m' :: * -> *). MonadPrim s m' => m' ()
bumpCurrentBatchId = MutVar (PrimState m') BatchId
-> (BatchId -> (BatchId, ())) -> m' ()
forall (m :: * -> *) a b.
PrimMonad m =>
MutVar (PrimState m) a -> (a -> (a, b)) -> m b
atomicModifyMutVar' MutVar s BatchId
MutVar (PrimState m') BatchId
batchVar (\(BatchId Int
x) -> (Int -> BatchId
BatchId (Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1), ()))
bumpBatchId :: MonadPrim s m' => BatchId -> m' Bool
bumpBatchId :: forall (m' :: * -> *). MonadPrim s m' => BatchId -> m' Bool
bumpBatchId (BatchId Int
c) =
MutVar (PrimState m') BatchId
-> (BatchId -> (BatchId, Bool)) -> m' Bool
forall (m :: * -> *) a b.
PrimMonad m =>
MutVar (PrimState m) a -> (a -> (a, b)) -> m b
atomicModifyMutVar' MutVar s BatchId
MutVar (PrimState m') BatchId
batchVar ((BatchId -> (BatchId, Bool)) -> m' Bool)
-> (BatchId -> (BatchId, Bool)) -> m' Bool
forall a b. (a -> b) -> a -> b
$ \b :: BatchId
b@(BatchId Int
x) ->
if Int
x Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
c
then (Int -> BatchId
BatchId (Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1), Bool
True)
else (BatchId
b, Bool
False)
takeBatchEarly :: MonadPrim s m' => m' (Maybe (Early a))
takeBatchEarly :: forall (m' :: * -> *). MonadPrim s m' => m' (Maybe (Early a))
takeBatchEarly = MutVar (PrimState m') (Maybe (Early a))
-> (Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
-> m' (Maybe (Early a))
forall (m :: * -> *) a b.
PrimMonad m =>
MutVar (PrimState m) a -> (a -> (a, b)) -> m b
atomicModifyMutVar' MutVar s (Maybe (Early a))
MutVar (PrimState m') (Maybe (Early a))
batchEarlyVar ((Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
-> m' (Maybe (Early a)))
-> (Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
-> m' (Maybe (Early a))
forall a b. (a -> b) -> a -> b
$ \Maybe (Early a)
mEarly -> (Maybe (Early a)
forall a. Maybe a
Nothing, Maybe (Early a)
mEarly)
takeResults :: MonadPrim s m' => m' [a]
takeResults :: forall (m' :: * -> *). MonadPrim s m' => m' [a]
takeResults = MutVar (PrimState m') [a] -> ([a] -> ([a], [a])) -> m' [a]
forall (m :: * -> *) a b.
PrimMonad m =>
MutVar (PrimState m) a -> (a -> (a, b)) -> m b
atomicModifyMutVar' MutVar s [a]
MutVar (PrimState m') [a]
resVar (([a] -> ([a], [a])) -> m' [a]) -> ([a] -> ([a], [a])) -> m' [a]
forall a b. (a -> b) -> a -> b
$ \[a]
res -> ([], [a]
res)
b
_ <-
Scheduler s a -> m b
action (Scheduler s a -> m b) -> Scheduler s a -> m b
forall a b. (a -> b) -> a -> b
$
Scheduler
{ _numWorkers :: Int
_numWorkers = Int
1
, _scheduleWorkId :: (WorkerId -> ST s a) -> ST s ()
_scheduleWorkId =
\WorkerId -> ST s a
f -> do
a
r <- WorkerId -> ST s a
f (Int -> WorkerId
WorkerId Int
0)
a
r a -> ST s () -> ST s ()
forall a b. a -> b -> b
`seq` MutVar (PrimState (ST s)) [a] -> ([a] -> ([a], ())) -> ST s ()
forall (m :: * -> *) a b.
PrimMonad m =>
MutVar (PrimState m) a -> (a -> (a, b)) -> m b
atomicModifyMutVar' MutVar s [a]
MutVar (PrimState (ST s)) [a]
resVar (\[a]
rs -> (a
r a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
rs, ()))
, _terminate :: Early a -> ST s a
_terminate =
\Early a
early -> do
ST s ()
forall (m' :: * -> *). MonadPrim s m' => m' ()
bumpCurrentBatchId
Results a
finishEarly <- Maybe (Early a) -> ST s [a] -> ST s (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults (Early a -> Maybe (Early a)
forall a. a -> Maybe a
Just Early a
early) ST s [a]
forall (m' :: * -> *). MonadPrim s m' => m' [a]
takeResults
Early a -> a
forall a. Early a -> a
unEarly Early a
early a -> ST s () -> ST s a
forall a b. a -> ST s b -> ST s a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ MutVar (PrimState (ST s)) (Maybe (Results a))
-> Maybe (Results a) -> ST s ()
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> a -> m ()
writeMutVar MutVar s (Maybe (Results a))
MutVar (PrimState (ST s)) (Maybe (Results a))
finResVar (Results a -> Maybe (Results a)
forall a. a -> Maybe a
Just Results a
finishEarly)
, _waitForCurrentBatch :: ST s (Results a)
_waitForCurrentBatch =
do Maybe (Early a)
mEarly <- ST s (Maybe (Early a))
forall (m' :: * -> *). MonadPrim s m' => m' (Maybe (Early a))
takeBatchEarly
ST s ()
forall (m' :: * -> *). MonadPrim s m' => m' ()
bumpCurrentBatchId
Maybe (Early a) -> ST s [a] -> ST s (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly (ST s [a] -> ST s (Results a))
-> ([a] -> ST s [a]) -> [a] -> ST s (Results a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [a] -> ST s [a]
forall a. a -> ST s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([a] -> ST s (Results a)) -> ST s [a] -> ST s (Results a)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< ST s [a]
forall (m' :: * -> *). MonadPrim s m' => m' [a]
takeResults
, _earlyResults :: ST s (Maybe (Results a))
_earlyResults = MutVar (PrimState (ST s)) (Maybe (Results a))
-> ST s (Maybe (Results a))
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar s (Maybe (Results a))
MutVar (PrimState (ST s)) (Maybe (Results a))
finResVar
, _currentBatchId :: ST s BatchId
_currentBatchId = MutVar (PrimState (ST s)) BatchId -> ST s BatchId
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar s BatchId
MutVar (PrimState (ST s)) BatchId
batchVar
, _batchEarly :: ST s (Maybe (Early a))
_batchEarly = ST s (Maybe (Early a))
forall (m' :: * -> *). MonadPrim s m' => m' (Maybe (Early a))
takeBatchEarly
, _cancelBatch :: BatchId -> Early a -> ST s Bool
_cancelBatch =
\BatchId
batchId Early a
early -> do
Bool
b <- BatchId -> ST s Bool
forall (m' :: * -> *). MonadPrim s m' => BatchId -> m' Bool
bumpBatchId BatchId
batchId
Bool -> ST s () -> ST s ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
b (ST s () -> ST s ()) -> ST s () -> ST s ()
forall a b. (a -> b) -> a -> b
$ MutVar (PrimState (ST s)) (Maybe (Early a))
-> Maybe (Early a) -> ST s ()
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> a -> m ()
writeMutVar MutVar s (Maybe (Early a))
MutVar (PrimState (ST s)) (Maybe (Early a))
batchEarlyVar (Early a -> Maybe (Early a)
forall a. a -> Maybe a
Just Early a
early)
Bool -> ST s Bool
forall a. a -> ST s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
b
}
MutVar (PrimState m) (Maybe (Results a)) -> m (Maybe (Results a))
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar s (Maybe (Results a))
MutVar (PrimState m) (Maybe (Results a))
finResVar m (Maybe (Results a))
-> (Maybe (Results a) -> m (Results a)) -> m (Results a)
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just Results a
rs -> Results a -> m (Results a)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Results a -> m (Results a)) -> Results a -> m (Results a)
forall a b. (a -> b) -> a -> b
$ Results a -> Results a
forall a. Results a -> Results a
reverseResults Results a
rs
Maybe (Results a)
Nothing -> do
Maybe (Early a)
mEarly <- m (Maybe (Early a))
forall (m' :: * -> *). MonadPrim s m' => m' (Maybe (Early a))
takeBatchEarly
Results a -> Results a
forall a. Results a -> Results a
reverseResults (Results a -> Results a) -> m (Results a) -> m (Results a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (Early a) -> m [a] -> m (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly m [a]
forall (m' :: * -> *). MonadPrim s m' => m' [a]
takeResults
withTrivialSchedulerRIO :: MonadUnliftIO m => (Scheduler RealWorld a -> m b) -> m (Results a)
withTrivialSchedulerRIO :: forall (m :: * -> *) a b.
MonadUnliftIO m =>
(Scheduler RealWorld a -> m b) -> m (Results a)
withTrivialSchedulerRIO Scheduler RealWorld a -> m b
action = do
IORef [a]
resRef <- IO (IORef [a]) -> m (IORef [a])
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef [a]) -> m (IORef [a]))
-> IO (IORef [a]) -> m (IORef [a])
forall a b. (a -> b) -> a -> b
$ [a] -> IO (IORef [a])
forall a. a -> IO (IORef a)
newIORef []
IORef BatchId
batchRef <- IO (IORef BatchId) -> m (IORef BatchId)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef BatchId) -> m (IORef BatchId))
-> IO (IORef BatchId) -> m (IORef BatchId)
forall a b. (a -> b) -> a -> b
$ BatchId -> IO (IORef BatchId)
forall a. a -> IO (IORef a)
newIORef (BatchId -> IO (IORef BatchId)) -> BatchId -> IO (IORef BatchId)
forall a b. (a -> b) -> a -> b
$ Int -> BatchId
BatchId Int
0
IORef (Maybe (Results a))
finResRef <- IO (IORef (Maybe (Results a))) -> m (IORef (Maybe (Results a)))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Maybe (Results a))) -> m (IORef (Maybe (Results a))))
-> IO (IORef (Maybe (Results a))) -> m (IORef (Maybe (Results a)))
forall a b. (a -> b) -> a -> b
$ Maybe (Results a) -> IO (IORef (Maybe (Results a)))
forall a. a -> IO (IORef a)
newIORef Maybe (Results a)
forall a. Maybe a
Nothing
IORef (Maybe (Early a))
batchEarlyRef <- IO (IORef (Maybe (Early a))) -> m (IORef (Maybe (Early a)))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Maybe (Early a))) -> m (IORef (Maybe (Early a))))
-> IO (IORef (Maybe (Early a))) -> m (IORef (Maybe (Early a)))
forall a b. (a -> b) -> a -> b
$ Maybe (Early a) -> IO (IORef (Maybe (Early a)))
forall a. a -> IO (IORef a)
newIORef Maybe (Early a)
forall a. Maybe a
Nothing
let bumpCurrentBatchId :: IO ()
bumpCurrentBatchId = IORef Int -> (Int -> Int) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (IORef BatchId -> IORef Int
forall a b. Coercible a b => a -> b
coerce IORef BatchId
batchRef) (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ (Int
1 :: Int))
bumpBatchId :: BatchId -> IO Bool
bumpBatchId (BatchId Int
c) =
IORef BatchId -> (BatchId -> (BatchId, Bool)) -> IO Bool
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef BatchId
batchRef ((BatchId -> (BatchId, Bool)) -> IO Bool)
-> (BatchId -> (BatchId, Bool)) -> IO Bool
forall a b. (a -> b) -> a -> b
$ \b :: BatchId
b@(BatchId Int
x) ->
if Int
x Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
c
then (Int -> BatchId
BatchId (Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1), Bool
True)
else (BatchId
b, Bool
False)
takeBatchEarly :: IO (Maybe (Early a))
takeBatchEarly = IORef (Maybe (Early a))
-> (Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
-> IO (Maybe (Early a))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef (Maybe (Early a))
batchEarlyRef ((Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
-> IO (Maybe (Early a)))
-> (Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
-> IO (Maybe (Early a))
forall a b. (a -> b) -> a -> b
$ \Maybe (Early a)
mEarly -> (Maybe (Early a)
forall a. Maybe a
Nothing, Maybe (Early a)
mEarly)
takeResults :: IO [a]
takeResults = IORef [a] -> ([a] -> ([a], [a])) -> IO [a]
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef [a]
resRef (([a] -> ([a], [a])) -> IO [a]) -> ([a] -> ([a], [a])) -> IO [a]
forall a b. (a -> b) -> a -> b
$ \[a]
res -> ([], [a]
res)
scheduler :: Scheduler RealWorld a
scheduler =
Scheduler
{ _numWorkers :: Int
_numWorkers = Int
1
, _scheduleWorkId :: (WorkerId -> ST RealWorld a) -> ST RealWorld ()
_scheduleWorkId =
\WorkerId -> ST RealWorld a
f -> do
a
r <- WorkerId -> ST RealWorld a
f (Int -> WorkerId
WorkerId Int
0)
a
r a -> ST RealWorld () -> ST RealWorld ()
forall a b. a -> b -> b
`seq` IO () -> ST RealWorld ()
forall (m :: * -> *) a.
(PrimMonad m, PrimState m ~ RealWorld) =>
IO a -> m a
ioToPrim (IORef [a] -> ([a] -> [a]) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef [a]
resRef (a
r a -> [a] -> [a]
forall a. a -> [a] -> [a]
:))
, _terminate :: Early a -> ST RealWorld a
_terminate =
\ !Early a
early ->
IO a -> ST RealWorld a
forall (m :: * -> *) a.
(PrimMonad m, PrimState m ~ RealWorld) =>
IO a -> m a
ioToPrim (IO a -> ST RealWorld a) -> IO a -> ST RealWorld a
forall a b. (a -> b) -> a -> b
$ do
IO ()
bumpCurrentBatchId
Results a
finishEarly <- Maybe (Early a) -> IO [a] -> IO (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults (Early a -> Maybe (Early a)
forall a. a -> Maybe a
Just Early a
early) IO [a]
takeResults
IORef (Maybe (Results a)) -> Maybe (Results a) -> IO ()
forall a. IORef a -> a -> IO ()
atomicWriteIORef IORef (Maybe (Results a))
finResRef (Results a -> Maybe (Results a)
forall a. a -> Maybe a
Just Results a
finishEarly)
TerminateEarlyException -> IO a
forall e a. Exception e => e -> IO a
throwIO TerminateEarlyException
TerminateEarlyException
, _waitForCurrentBatch :: ST RealWorld (Results a)
_waitForCurrentBatch =
IO (Results a) -> ST RealWorld (Results a)
forall (m :: * -> *) a.
(PrimMonad m, PrimState m ~ RealWorld) =>
IO a -> m a
ioToPrim (IO (Results a) -> ST RealWorld (Results a))
-> IO (Results a) -> ST RealWorld (Results a)
forall a b. (a -> b) -> a -> b
$ do
IO ()
bumpCurrentBatchId
Maybe (Early a)
mEarly <- IO (Maybe (Early a))
takeBatchEarly
Maybe (Early a) -> IO [a] -> IO (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly (IO [a] -> IO (Results a))
-> ([a] -> IO [a]) -> [a] -> IO (Results a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [a] -> IO [a]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([a] -> IO (Results a)) -> IO [a] -> IO (Results a)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO [a]
takeResults
, _earlyResults :: ST RealWorld (Maybe (Results a))
_earlyResults = IO (Maybe (Results a)) -> ST RealWorld (Maybe (Results a))
forall (m :: * -> *) a.
(PrimMonad m, PrimState m ~ RealWorld) =>
IO a -> m a
ioToPrim (IORef (Maybe (Results a)) -> IO (Maybe (Results a))
forall a. IORef a -> IO a
readIORef IORef (Maybe (Results a))
finResRef)
, _currentBatchId :: ST RealWorld BatchId
_currentBatchId = IO BatchId -> ST RealWorld BatchId
forall (m :: * -> *) a.
(PrimMonad m, PrimState m ~ RealWorld) =>
IO a -> m a
ioToPrim (IORef BatchId -> IO BatchId
forall a. IORef a -> IO a
readIORef IORef BatchId
batchRef)
, _batchEarly :: ST RealWorld (Maybe (Early a))
_batchEarly = IO (Maybe (Early a)) -> ST RealWorld (Maybe (Early a))
forall (m :: * -> *) a.
(PrimMonad m, PrimState m ~ RealWorld) =>
IO a -> m a
ioToPrim IO (Maybe (Early a))
takeBatchEarly
, _cancelBatch :: BatchId -> Early a -> ST RealWorld Bool
_cancelBatch =
\BatchId
batchId Early a
early -> IO Bool -> ST RealWorld Bool
forall (m :: * -> *) a.
(PrimMonad m, PrimState m ~ RealWorld) =>
IO a -> m a
ioToPrim (IO Bool -> ST RealWorld Bool) -> IO Bool -> ST RealWorld Bool
forall a b. (a -> b) -> a -> b
$ do
Bool
b <- BatchId -> IO Bool
bumpBatchId BatchId
batchId
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
b (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe (Early a)) -> Maybe (Early a) -> IO ()
forall a. IORef a -> a -> IO ()
atomicWriteIORef IORef (Maybe (Early a))
batchEarlyRef (Early a -> Maybe (Early a)
forall a. a -> Maybe a
Just Early a
early)
Bool -> IO Bool
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
b
}
Either TerminateEarlyException b
_ :: Either TerminateEarlyException b <- ((forall a. m a -> IO a) -> IO (Either TerminateEarlyException b))
-> m (Either TerminateEarlyException b)
forall b. ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO (Either TerminateEarlyException b))
-> m (Either TerminateEarlyException b))
-> ((forall a. m a -> IO a)
-> IO (Either TerminateEarlyException b))
-> m (Either TerminateEarlyException b)
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> IO b -> IO (Either TerminateEarlyException b)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO b -> IO (Either TerminateEarlyException b))
-> IO b -> IO (Either TerminateEarlyException b)
forall a b. (a -> b) -> a -> b
$ m b -> IO b
forall a. m a -> IO a
run (m b -> IO b) -> m b -> IO b
forall a b. (a -> b) -> a -> b
$ Scheduler RealWorld a -> m b
action Scheduler RealWorld a
scheduler
IO (Maybe (Results a)) -> m (Maybe (Results a))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IORef (Maybe (Results a)) -> IO (Maybe (Results a))
forall a. IORef a -> IO a
readIORef IORef (Maybe (Results a))
finResRef) m (Maybe (Results a))
-> (Maybe (Results a) -> m (Results a)) -> m (Results a)
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just Results a
rs -> Results a -> m (Results a)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Results a
rs
Maybe (Results a)
Nothing ->
IO (Results a) -> m (Results a)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Results a) -> m (Results a))
-> IO (Results a) -> m (Results a)
forall a b. (a -> b) -> a -> b
$ do
Maybe (Early a)
mEarly <- IO (Maybe (Early a))
takeBatchEarly
Maybe (Early a) -> IO [a] -> IO (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly IO [a]
takeResults
{-# INLINEABLE withTrivialSchedulerRIO #-}
traverse_ :: (Applicative f, Foldable t) => (a -> f ()) -> t a -> f ()
traverse_ :: forall (f :: * -> *) (t :: * -> *) a.
(Applicative f, Foldable t) =>
(a -> f ()) -> t a -> f ()
traverse_ a -> f ()
f = (f () -> a -> f ()) -> f () -> t a -> f ()
forall b a. (b -> a -> b) -> b -> t a -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
F.foldl' (\f ()
c a
a -> f ()
c f () -> f () -> f ()
forall a b. f a -> f b -> f b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> a -> f ()
f a
a) (() -> f ()
forall a. a -> f a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
{-# INLINE traverse_ #-}
scheduleJobs :: MonadIO m => Jobs m a -> (WorkerId -> m a) -> m ()
scheduleJobs :: forall (m :: * -> *) a.
MonadIO m =>
Jobs m a -> (WorkerId -> m a) -> m ()
scheduleJobs = (((a -> m ()) -> WorkerId -> m ()) -> m (Job m a))
-> Jobs m a -> (WorkerId -> m a) -> m ()
forall (m :: * -> *) b a.
MonadIO m =>
(((b -> m ()) -> WorkerId -> m ()) -> m (Job m a))
-> Jobs m a -> (WorkerId -> m b) -> m ()
scheduleJobsWith ((a -> m ()) -> WorkerId -> m ()) -> m (Job m a)
forall (m :: * -> *) a.
MonadIO m =>
((a -> m ()) -> WorkerId -> m ()) -> m (Job m a)
mkJob
{-# INLINEABLE scheduleJobs #-}
scheduleJobs_ :: MonadIO m => Jobs m a -> (WorkerId -> m b) -> m ()
scheduleJobs_ :: forall (m :: * -> *) a b.
MonadIO m =>
Jobs m a -> (WorkerId -> m b) -> m ()
scheduleJobs_ = (((b -> m ()) -> WorkerId -> m ()) -> m (Job m a))
-> Jobs m a -> (WorkerId -> m b) -> m ()
forall (m :: * -> *) b a.
MonadIO m =>
(((b -> m ()) -> WorkerId -> m ()) -> m (Job m a))
-> Jobs m a -> (WorkerId -> m b) -> m ()
scheduleJobsWith (\(b -> m ()) -> WorkerId -> m ()
job -> Job m a -> m (Job m a)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((WorkerId -> m ()) -> Job m a
forall (m :: * -> *) a. (WorkerId -> m ()) -> Job m a
Job_ (m () -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m () -> m ()) -> (WorkerId -> m ()) -> WorkerId -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (b -> m ()) -> WorkerId -> m ()
job (\b
_ -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()))))
{-# INLINEABLE scheduleJobs_ #-}
scheduleJobsWith ::
MonadIO m
=> (((b -> m ()) -> WorkerId -> m ()) -> m (Job m a))
-> Jobs m a
-> (WorkerId -> m b)
-> m ()
scheduleJobsWith :: forall (m :: * -> *) b a.
MonadIO m =>
(((b -> m ()) -> WorkerId -> m ()) -> m (Job m a))
-> Jobs m a -> (WorkerId -> m b) -> m ()
scheduleJobsWith ((b -> m ()) -> WorkerId -> m ()) -> m (Job m a)
mkJob' Jobs {Int
MVar SchedulerStatus
PVar Int RealWorld
JQueue m a
jobsNumWorkers :: Int
jobsQueue :: JQueue m a
jobsQueueCount :: PVar Int RealWorld
jobsSchedulerStatus :: MVar SchedulerStatus
jobsNumWorkers :: forall (m :: * -> *) a. Jobs m a -> Int
jobsQueue :: forall (m :: * -> *) a. Jobs m a -> JQueue m a
jobsQueueCount :: forall (m :: * -> *) a. Jobs m a -> PVar Int RealWorld
jobsSchedulerStatus :: forall (m :: * -> *) a. Jobs m a -> MVar SchedulerStatus
..} WorkerId -> m b
action = do
Job m a
job <-
((b -> m ()) -> WorkerId -> m ()) -> m (Job m a)
mkJob' (((b -> m ()) -> WorkerId -> m ()) -> m (Job m a))
-> ((b -> m ()) -> WorkerId -> m ()) -> m (Job m a)
forall a b. (a -> b) -> a -> b
$ \b -> m ()
storeResult WorkerId
wid -> do
b
res <- WorkerId -> m b
action WorkerId
wid
b
res b -> m () -> m ()
forall a b. a -> b -> b
`seq` b -> m ()
storeResult b
res
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ PVar Int RealWorld -> Int -> IO Int
forall s (m :: * -> *). MonadPrim s m => PVar Int s -> Int -> m Int
atomicAddIntPVar PVar Int RealWorld
jobsQueueCount Int
1
JQueue m a -> Job m a -> m ()
forall (m :: * -> *) a. MonadIO m => JQueue m a -> Job m a -> m ()
pushJQueue JQueue m a
jobsQueue Job m a
job
{-# INLINEABLE scheduleJobsWith #-}
runWorker ::
MonadUnliftIO m
=> (forall b. m b -> IO b)
-> (forall c. IO c -> IO c)
-> WorkerId
-> Jobs m a
-> IO ()
runWorker :: forall (m :: * -> *) a.
MonadUnliftIO m =>
(forall b. m b -> IO b)
-> (forall a. IO a -> IO a) -> WorkerId -> Jobs m a -> IO ()
runWorker forall b. m b -> IO b
run forall a. IO a -> IO a
unmask WorkerId
wId Jobs {JQueue m a
jobsQueue :: forall (m :: * -> *) a. Jobs m a -> JQueue m a
jobsQueue :: JQueue m a
jobsQueue, PVar Int RealWorld
jobsQueueCount :: forall (m :: * -> *) a. Jobs m a -> PVar Int RealWorld
jobsQueueCount :: PVar Int RealWorld
jobsQueueCount, MVar SchedulerStatus
jobsSchedulerStatus :: forall (m :: * -> *) a. Jobs m a -> MVar SchedulerStatus
jobsSchedulerStatus :: MVar SchedulerStatus
jobsSchedulerStatus} = IO ()
go
where
onBlockedMVar :: Either SomeException () -> IO ()
onBlockedMVar Either SomeException ()
eUnblocked =
case Either SomeException ()
eUnblocked of
Right () -> IO ()
go
Left SomeException
uExc
| Just WorkerTerminateException
WorkerTerminateException <- SomeException -> Maybe WorkerTerminateException
forall e. Exception e => SomeException -> Maybe e
asyncExceptionFromException SomeException
uExc -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Left SomeException
uExc
| Just CancelBatchException
CancelBatchException <- SomeException -> Maybe CancelBatchException
forall e. Exception e => SomeException -> Maybe e
asyncExceptionFromException SomeException
uExc -> IO ()
go
Left SomeException
uExc -> SomeException -> IO ()
forall e a. Exception e => e -> IO a
throwIO SomeException
uExc
go :: IO ()
go = do
Either SomeException Int
eRes <- IO Int -> IO (Either SomeException Int)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO Int -> IO (Either SomeException Int))
-> IO Int -> IO (Either SomeException Int)
forall a b. (a -> b) -> a -> b
$ do
WorkerId -> m ()
job <- m (WorkerId -> m ()) -> IO (WorkerId -> m ())
forall b. m b -> IO b
run (JQueue m a -> m (WorkerId -> m ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
JQueue m a -> m (WorkerId -> m ())
popJQueue JQueue m a
jobsQueue)
IO Int -> IO Int
forall a. IO a -> IO a
unmask (m () -> IO ()
forall b. m b -> IO b
run (WorkerId -> m ()
job WorkerId
wId) IO () -> IO Int -> IO Int
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> PVar Int RealWorld -> Int -> IO Int
forall s (m :: * -> *). MonadPrim s m => PVar Int s -> Int -> m Int
atomicSubIntPVar PVar Int RealWorld
jobsQueueCount Int
1)
case Either SomeException Int
eRes of
Right Int
1 -> IO () -> IO (Either SomeException ())
forall e a. Exception e => IO a -> IO (Either e a)
try (MVar SchedulerStatus -> SchedulerStatus -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar SchedulerStatus
jobsSchedulerStatus SchedulerStatus
SchedulerIdle) IO (Either SomeException ())
-> (Either SomeException () -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Either SomeException () -> IO ()
onBlockedMVar
Right Int
_ -> IO ()
go
Left SomeException
exc
| Just WorkerTerminateException
WorkerTerminateException <- SomeException -> Maybe WorkerTerminateException
forall e. Exception e => SomeException -> Maybe e
asyncExceptionFromException SomeException
exc -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Left SomeException
exc
| Just CancelBatchException
CancelBatchException <- SomeException -> Maybe CancelBatchException
forall e. Exception e => SomeException -> Maybe e
asyncExceptionFromException SomeException
exc -> IO ()
go
Left SomeException
exc -> do
Either SomeException ()
eUnblocked <-
IO () -> IO (Either SomeException ())
forall e a. Exception e => IO a -> IO (Either e a)
try (IO () -> IO (Either SomeException ()))
-> IO () -> IO (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ MVar SchedulerStatus -> SchedulerStatus -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar SchedulerStatus
jobsSchedulerStatus (WorkerException -> SchedulerStatus
SchedulerWorkerException (SomeException -> WorkerException
WorkerException SomeException
exc))
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (SomeException -> Bool
forall e. Exception e => e -> Bool
isSyncException SomeException
exc) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ SomeException -> IO ()
forall e a. Exception e => e -> IO a
throwIO SomeException
exc
Either SomeException () -> IO ()
onBlockedMVar Either SomeException ()
eUnblocked
{-# INLINEABLE runWorker #-}
initScheduler ::
Comp
-> (Jobs IO a -> (WorkerId -> IO a) -> IO ())
-> (JQueue IO a -> IO [a])
-> IO (Jobs IO a, [ThreadId] -> Scheduler RealWorld a)
initScheduler :: forall a.
Comp
-> (Jobs IO a -> (WorkerId -> IO a) -> IO ())
-> (JQueue IO a -> IO [a])
-> IO (Jobs IO a, [ThreadId] -> Scheduler RealWorld a)
initScheduler Comp
comp Jobs IO a -> (WorkerId -> IO a) -> IO ()
submitWork JQueue IO a -> IO [a]
collect = do
Int
jobsNumWorkers <- Comp -> IO Int
forall (m :: * -> *). MonadIO m => Comp -> m Int
getCompWorkers Comp
comp
JQueue IO a
jobsQueue <- IO (JQueue IO a)
forall (m :: * -> *) a. MonadIO m => m (JQueue m a)
newJQueue
PVar Int RealWorld
jobsQueueCount <- IO (PVar Int RealWorld) -> IO (PVar Int RealWorld)
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (PVar Int RealWorld) -> IO (PVar Int RealWorld))
-> IO (PVar Int RealWorld) -> IO (PVar Int RealWorld)
forall a b. (a -> b) -> a -> b
$ Int -> IO (PVar Int RealWorld)
forall s (m :: * -> *) a.
(MonadPrim s m, Prim a) =>
a -> m (PVar a s)
newPVar Int
1
MVar SchedulerStatus
jobsSchedulerStatus <- IO (MVar SchedulerStatus) -> IO (MVar SchedulerStatus)
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar SchedulerStatus)
forall a. IO (MVar a)
newEmptyMVar
IORef (Maybe (Results a))
earlyTerminationResultRef <- IO (IORef (Maybe (Results a))) -> IO (IORef (Maybe (Results a)))
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Maybe (Results a))) -> IO (IORef (Maybe (Results a))))
-> IO (IORef (Maybe (Results a))) -> IO (IORef (Maybe (Results a)))
forall a b. (a -> b) -> a -> b
$ Maybe (Results a) -> IO (IORef (Maybe (Results a)))
forall a. a -> IO (IORef a)
newIORef Maybe (Results a)
forall a. Maybe a
Nothing
IORef BatchId
batchIdRef <- IO (IORef BatchId) -> IO (IORef BatchId)
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef BatchId) -> IO (IORef BatchId))
-> IO (IORef BatchId) -> IO (IORef BatchId)
forall a b. (a -> b) -> a -> b
$ BatchId -> IO (IORef BatchId)
forall a. a -> IO (IORef a)
newIORef (BatchId -> IO (IORef BatchId)) -> BatchId -> IO (IORef BatchId)
forall a b. (a -> b) -> a -> b
$ Int -> BatchId
BatchId Int
0
IORef (Maybe (Early a))
batchEarlyRef <- IO (IORef (Maybe (Early a))) -> IO (IORef (Maybe (Early a)))
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Maybe (Early a))) -> IO (IORef (Maybe (Early a))))
-> IO (IORef (Maybe (Early a))) -> IO (IORef (Maybe (Early a)))
forall a b. (a -> b) -> a -> b
$ Maybe (Early a) -> IO (IORef (Maybe (Early a)))
forall a. a -> IO (IORef a)
newIORef Maybe (Early a)
forall a. Maybe a
Nothing
let jobs :: Jobs IO a
jobs =
Jobs
{ jobsNumWorkers :: Int
jobsNumWorkers = Int
jobsNumWorkers
, jobsQueue :: JQueue IO a
jobsQueue = JQueue IO a
jobsQueue
, jobsQueueCount :: PVar Int RealWorld
jobsQueueCount = PVar Int RealWorld
jobsQueueCount
, jobsSchedulerStatus :: MVar SchedulerStatus
jobsSchedulerStatus = MVar SchedulerStatus
jobsSchedulerStatus
}
bumpCurrentBatchId :: IO ()
bumpCurrentBatchId = IORef Int -> (Int -> Int) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (IORef BatchId -> IORef Int
forall a b. Coercible a b => a -> b
coerce IORef BatchId
batchIdRef) (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ (Int
1 :: Int))
bumpBatchId :: BatchId -> IO Bool
bumpBatchId (BatchId Int
c) =
IORef BatchId -> (BatchId -> (BatchId, Bool)) -> IO Bool
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef BatchId
batchIdRef ((BatchId -> (BatchId, Bool)) -> IO Bool)
-> (BatchId -> (BatchId, Bool)) -> IO Bool
forall a b. (a -> b) -> a -> b
$ \b :: BatchId
b@(BatchId Int
x) ->
if Int
x Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
c
then (Int -> BatchId
BatchId (Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1), Bool
True)
else (BatchId
b, Bool
False)
mkScheduler :: [ThreadId] -> Scheduler RealWorld a
mkScheduler [ThreadId]
tids =
Scheduler
{ _numWorkers :: Int
_numWorkers = Int
jobsNumWorkers
, _scheduleWorkId :: (WorkerId -> ST RealWorld a) -> ST RealWorld ()
_scheduleWorkId = \WorkerId -> ST RealWorld a
f -> IO () -> ST RealWorld ()
forall (m :: * -> *) a.
(PrimMonad m, PrimState m ~ RealWorld) =>
IO a -> m a
ioToPrim (IO () -> ST RealWorld ()) -> IO () -> ST RealWorld ()
forall a b. (a -> b) -> a -> b
$ Jobs IO a -> (WorkerId -> IO a) -> IO ()
submitWork Jobs IO a
jobs (ST RealWorld a -> IO a
ST (PrimState IO) a -> IO a
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (ST RealWorld a -> IO a)
-> (WorkerId -> ST RealWorld a) -> WorkerId -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. WorkerId -> ST RealWorld a
f)
, _terminate :: Early a -> ST RealWorld a
_terminate =
\Early a
early -> IO a -> ST RealWorld a
forall (m :: * -> *) a.
(PrimMonad m, PrimState m ~ RealWorld) =>
IO a -> m a
ioToPrim (IO a -> ST RealWorld a) -> IO a -> ST RealWorld a
forall a b. (a -> b) -> a -> b
$ do
Results a
finishEarly <-
case Early a
early of
Early a
r -> [a] -> a -> Results a
forall a. [a] -> a -> Results a
FinishedEarly ([a] -> a -> Results a) -> IO [a] -> IO (a -> Results a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> JQueue IO a -> IO [a]
collect JQueue IO a
jobsQueue IO (a -> Results a) -> IO a -> IO (Results a)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> a -> IO a
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
r
EarlyWith a
r -> Results a -> IO (Results a)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Results a -> IO (Results a)) -> Results a -> IO (Results a)
forall a b. (a -> b) -> a -> b
$ a -> Results a
forall a. a -> Results a
FinishedEarlyWith a
r
IO a -> IO a
forall (m :: * -> *) a.
(PrimMonad m, PrimState m ~ RealWorld) =>
IO a -> m a
ioToPrim (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ do
IO ()
bumpCurrentBatchId
IORef (Maybe (Results a)) -> Maybe (Results a) -> IO ()
forall a. IORef a -> a -> IO ()
atomicWriteIORef IORef (Maybe (Results a))
earlyTerminationResultRef (Maybe (Results a) -> IO ()) -> Maybe (Results a) -> IO ()
forall a b. (a -> b) -> a -> b
$ Results a -> Maybe (Results a)
forall a. a -> Maybe a
Just Results a
finishEarly
TerminateEarlyException -> IO a
forall e a. Exception e => e -> IO a
throwIO TerminateEarlyException
TerminateEarlyException
, _waitForCurrentBatch :: ST RealWorld (Results a)
_waitForCurrentBatch = IO (Results a) -> ST RealWorld (Results a)
forall (m :: * -> *) a.
(PrimMonad m, PrimState m ~ RealWorld) =>
IO a -> m a
ioToPrim (IO (Results a) -> ST RealWorld (Results a))
-> IO (Results a) -> ST RealWorld (Results a)
forall a b. (a -> b) -> a -> b
$
do Jobs IO a -> (WorkerId -> IO ()) -> IO ()
forall (m :: * -> *) a b.
MonadIO m =>
Jobs m a -> (WorkerId -> m b) -> m ()
scheduleJobs_ Jobs IO a
jobs (\WorkerId
_ -> IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ PVar Int RealWorld -> Int -> IO Int
forall s (m :: * -> *). MonadPrim s m => PVar Int s -> Int -> m Int
atomicSubIntPVar PVar Int RealWorld
jobsQueueCount Int
1)
JQueue IO a -> IO ()
forall (m :: * -> *) a. MonadIO m => JQueue m a -> m ()
unblockPopJQueue JQueue IO a
jobsQueue
SchedulerStatus
status <- IO SchedulerStatus -> IO SchedulerStatus
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO SchedulerStatus -> IO SchedulerStatus)
-> IO SchedulerStatus -> IO SchedulerStatus
forall a b. (a -> b) -> a -> b
$ MVar SchedulerStatus -> IO SchedulerStatus
forall a. MVar a -> IO a
takeMVar MVar SchedulerStatus
jobsSchedulerStatus
Maybe (Early a)
mEarly <- IO (Maybe (Early a)) -> IO (Maybe (Early a))
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (Early a)) -> IO (Maybe (Early a)))
-> IO (Maybe (Early a)) -> IO (Maybe (Early a))
forall a b. (a -> b) -> a -> b
$ IORef (Maybe (Early a))
-> (Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
-> IO (Maybe (Early a))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef (Maybe (Early a))
batchEarlyRef ((Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
-> IO (Maybe (Early a)))
-> (Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
-> IO (Maybe (Early a))
forall a b. (a -> b) -> a -> b
$ \Maybe (Early a)
mEarly -> (Maybe (Early a)
forall a. Maybe a
Nothing, Maybe (Early a)
mEarly)
Results a
rs <-
case SchedulerStatus
status of
SchedulerWorkerException (WorkerException SomeException
exc) ->
case SomeException -> Maybe CancelBatchException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
exc of
Just CancelBatchException
CancelBatchException -> do
()
_ <- JQueue IO a -> IO ()
forall (m :: * -> *) a. MonadIO m => JQueue m a -> m ()
clearPendingJQueue JQueue IO a
jobsQueue
IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
(ThreadId -> IO ()) -> [ThreadId] -> IO ()
forall (f :: * -> *) (t :: * -> *) a.
(Applicative f, Foldable t) =>
(a -> f ()) -> t a -> f ()
traverse_ (ThreadId -> SomeAsyncException -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
`throwTo` CancelBatchException -> SomeAsyncException
forall e. Exception e => e -> SomeAsyncException
SomeAsyncException CancelBatchException
CancelBatchException) [ThreadId]
tids
Maybe (Early a) -> IO [a] -> IO (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly (IO [a] -> IO (Results a))
-> ([a] -> IO [a]) -> [a] -> IO (Results a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [a] -> IO [a]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([a] -> IO (Results a)) -> IO [a] -> IO (Results a)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< JQueue IO a -> IO [a]
collect JQueue IO a
jobsQueue
Maybe CancelBatchException
Nothing -> IO (Results a) -> IO (Results a)
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Results a) -> IO (Results a))
-> IO (Results a) -> IO (Results a)
forall a b. (a -> b) -> a -> b
$ SomeException -> IO (Results a)
forall e a. Exception e => e -> IO a
throwIO SomeException
exc
SchedulerStatus
SchedulerIdle -> do
JQueue IO a -> IO ()
forall (m :: * -> *) a. MonadIO m => JQueue m a -> m ()
blockPopJQueue JQueue IO a
jobsQueue
IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ()
bumpCurrentBatchId
[a]
res <- JQueue IO a -> IO [a]
collect JQueue IO a
jobsQueue
[a]
res [a] -> IO (Results a) -> IO (Results a)
forall a b. a -> b -> b
`seq` Maybe (Early a) -> IO [a] -> IO (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly ([a] -> IO [a]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [a]
res)
Results a
rs Results a -> IO () -> IO (Results a)
forall a b. a -> IO b -> IO a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (PVar Int RealWorld -> Int -> IO ()
forall s (m :: * -> *). MonadPrim s m => PVar Int s -> Int -> m ()
atomicWriteIntPVar PVar Int RealWorld
jobsQueueCount Int
1)
, _earlyResults :: ST RealWorld (Maybe (Results a))
_earlyResults = IO (Maybe (Results a)) -> ST RealWorld (Maybe (Results a))
forall (m :: * -> *) a.
(PrimMonad m, PrimState m ~ RealWorld) =>
IO a -> m a
ioToPrim (IORef (Maybe (Results a)) -> IO (Maybe (Results a))
forall a. IORef a -> IO a
readIORef IORef (Maybe (Results a))
earlyTerminationResultRef)
, _currentBatchId :: ST RealWorld BatchId
_currentBatchId = IO BatchId -> ST RealWorld BatchId
forall (m :: * -> *) a.
(PrimMonad m, PrimState m ~ RealWorld) =>
IO a -> m a
ioToPrim (IORef BatchId -> IO BatchId
forall a. IORef a -> IO a
readIORef IORef BatchId
batchIdRef)
, _batchEarly :: ST RealWorld (Maybe (Early a))
_batchEarly = IO (Maybe (Early a)) -> ST RealWorld (Maybe (Early a))
forall (m :: * -> *) a.
(PrimMonad m, PrimState m ~ RealWorld) =>
IO a -> m a
ioToPrim (IORef (Maybe (Early a)) -> IO (Maybe (Early a))
forall a. IORef a -> IO a
readIORef IORef (Maybe (Early a))
batchEarlyRef)
, _cancelBatch :: BatchId -> Early a -> ST RealWorld Bool
_cancelBatch =
\BatchId
batchId Early a
early -> IO Bool -> ST RealWorld Bool
forall (m :: * -> *) a.
(PrimMonad m, PrimState m ~ RealWorld) =>
IO a -> m a
ioToPrim (IO Bool -> ST RealWorld Bool) -> IO Bool -> ST RealWorld Bool
forall a b. (a -> b) -> a -> b
$ do
Bool
b <- IO Bool -> IO Bool
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> IO Bool) -> IO Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ BatchId -> IO Bool
bumpBatchId BatchId
batchId
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
b (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
JQueue IO a -> IO ()
forall (m :: * -> *) a. MonadIO m => JQueue m a -> m ()
blockPopJQueue JQueue IO a
jobsQueue
IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
IORef (Maybe (Early a)) -> Maybe (Early a) -> IO ()
forall a. IORef a -> a -> IO ()
atomicWriteIORef IORef (Maybe (Early a))
batchEarlyRef (Maybe (Early a) -> IO ()) -> Maybe (Early a) -> IO ()
forall a b. (a -> b) -> a -> b
$ Early a -> Maybe (Early a)
forall a. a -> Maybe a
Just Early a
early
CancelBatchException -> IO ()
forall e a. Exception e => e -> IO a
throwIO CancelBatchException
CancelBatchException
Bool -> IO Bool
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
b
}
(Jobs IO a, [ThreadId] -> Scheduler RealWorld a)
-> IO (Jobs IO a, [ThreadId] -> Scheduler RealWorld a)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Jobs IO a
jobs, [ThreadId] -> Scheduler RealWorld a
mkScheduler)
{-# INLINEABLE initScheduler #-}
withSchedulerInternal ::
Comp
-> (Jobs IO a -> (WorkerId -> IO a) -> IO ())
-> (JQueue IO a -> IO [a])
-> (Scheduler RealWorld a -> IO b)
-> IO (Results a)
withSchedulerInternal :: forall a b.
Comp
-> (Jobs IO a -> (WorkerId -> IO a) -> IO ())
-> (JQueue IO a -> IO [a])
-> (Scheduler RealWorld a -> IO b)
-> IO (Results a)
withSchedulerInternal Comp
comp Jobs IO a -> (WorkerId -> IO a) -> IO ()
submitWork JQueue IO a -> IO [a]
collect Scheduler RealWorld a -> IO b
onScheduler = do
(jobs :: Jobs IO a
jobs@Jobs {Int
MVar SchedulerStatus
PVar Int RealWorld
JQueue IO a
jobsNumWorkers :: forall (m :: * -> *) a. Jobs m a -> Int
jobsQueue :: forall (m :: * -> *) a. Jobs m a -> JQueue m a
jobsQueueCount :: forall (m :: * -> *) a. Jobs m a -> PVar Int RealWorld
jobsSchedulerStatus :: forall (m :: * -> *) a. Jobs m a -> MVar SchedulerStatus
jobsNumWorkers :: Int
jobsQueue :: JQueue IO a
jobsQueueCount :: PVar Int RealWorld
jobsSchedulerStatus :: MVar SchedulerStatus
..}, [ThreadId] -> Scheduler RealWorld a
mkScheduler) <- Comp
-> (Jobs IO a -> (WorkerId -> IO a) -> IO ())
-> (JQueue IO a -> IO [a])
-> IO (Jobs IO a, [ThreadId] -> Scheduler RealWorld a)
forall a.
Comp
-> (Jobs IO a -> (WorkerId -> IO a) -> IO ())
-> (JQueue IO a -> IO [a])
-> IO (Jobs IO a, [ThreadId] -> Scheduler RealWorld a)
initScheduler Comp
comp Jobs IO a -> (WorkerId -> IO a) -> IO ()
submitWork JQueue IO a -> IO [a]
collect
((forall a. IO a -> IO a) -> IO (Results a)) -> IO (Results a)
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. IO a -> IO a) -> IO (Results a)) -> IO (Results a))
-> ((forall a. IO a -> IO a) -> IO (Results a)) -> IO (Results a)
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
run -> do
IO [ThreadId]
-> ([ThreadId] -> IO ())
-> ([ThreadId] -> IO (Results a))
-> IO (Results a)
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (IO [ThreadId] -> IO [ThreadId]
forall a. IO a -> IO a
run (Jobs IO a -> Comp -> IO [ThreadId]
forall (m :: * -> *) a.
MonadUnliftIO m =>
Jobs m a -> Comp -> m [ThreadId]
spawnWorkers Jobs IO a
jobs Comp
comp)) [ThreadId] -> IO ()
terminateWorkers (([ThreadId] -> IO (Results a)) -> IO (Results a))
-> ([ThreadId] -> IO (Results a)) -> IO (Results a)
forall a b. (a -> b) -> a -> b
$ \[ThreadId]
tids ->
let scheduler :: Scheduler RealWorld a
scheduler = [ThreadId] -> Scheduler RealWorld a
mkScheduler [ThreadId]
tids
readEarlyTermination :: IO (Results a)
readEarlyTermination =
ST (PrimState IO) (Maybe (Results a)) -> IO (Maybe (Results a))
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (Scheduler RealWorld a -> ST RealWorld (Maybe (Results a))
forall s a. Scheduler s a -> ST s (Maybe (Results a))
_earlyResults Scheduler RealWorld a
scheduler) IO (Maybe (Results a))
-> (Maybe (Results a) -> IO (Results a)) -> IO (Results a)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe (Results a)
Nothing -> [Char] -> IO (Results a)
forall a. HasCallStack => [Char] -> a
error [Char]
"Impossible: uninitialized early termination value"
Just Results a
rs -> Results a -> IO (Results a)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Results a
rs
in IO b -> IO (Either TerminateEarlyException b)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO b -> IO b
forall a. IO a -> IO a
run (Scheduler RealWorld a -> IO b
onScheduler Scheduler RealWorld a
scheduler)) IO (Either TerminateEarlyException b)
-> (Either TerminateEarlyException b -> IO (Results a))
-> IO (Results a)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left TerminateEarlyException
TerminateEarlyException -> IO (Results a) -> IO (Results a)
forall a. IO a -> IO a
run IO (Results a)
readEarlyTermination
Right b
_ -> do
IO () -> IO ()
forall a. IO a -> IO a
run (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Jobs IO a -> (WorkerId -> IO ()) -> IO ()
forall (m :: * -> *) a b.
MonadIO m =>
Jobs m a -> (WorkerId -> m b) -> m ()
scheduleJobs_ Jobs IO a
jobs (\WorkerId
_ -> IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ PVar Int RealWorld -> Int -> IO Int
forall s (m :: * -> *). MonadPrim s m => PVar Int s -> Int -> m Int
atomicSubIntPVar PVar Int RealWorld
jobsQueueCount Int
1)
IO () -> IO ()
forall a. IO a -> IO a
run (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ JQueue IO a -> IO ()
forall (m :: * -> *) a. MonadIO m => JQueue m a -> m ()
unblockPopJQueue JQueue IO a
jobsQueue
SchedulerStatus
status <- MVar SchedulerStatus -> IO SchedulerStatus
forall a. MVar a -> IO a
takeMVar MVar SchedulerStatus
jobsSchedulerStatus
case SchedulerStatus
status of
SchedulerWorkerException (WorkerException SomeException
exc)
| Just TerminateEarlyException
TerminateEarlyException <- SomeException -> Maybe TerminateEarlyException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
exc -> IO (Results a) -> IO (Results a)
forall a. IO a -> IO a
run IO (Results a)
readEarlyTermination
| Just CancelBatchException
CancelBatchException <- SomeException -> Maybe CancelBatchException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
exc ->
IO (Results a) -> IO (Results a)
forall a. IO a -> IO a
run (IO (Results a) -> IO (Results a))
-> IO (Results a) -> IO (Results a)
forall a b. (a -> b) -> a -> b
$ do
Maybe (Early a)
mEarly <- ST (PrimState IO) (Maybe (Early a)) -> IO (Maybe (Early a))
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (ST (PrimState IO) (Maybe (Early a)) -> IO (Maybe (Early a)))
-> ST (PrimState IO) (Maybe (Early a)) -> IO (Maybe (Early a))
forall a b. (a -> b) -> a -> b
$ Scheduler RealWorld a -> ST RealWorld (Maybe (Early a))
forall s a. Scheduler s a -> ST s (Maybe (Early a))
_batchEarly Scheduler RealWorld a
scheduler
Maybe (Early a) -> IO [a] -> IO (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly (JQueue IO a -> IO [a]
collect JQueue IO a
jobsQueue)
| Bool
otherwise -> SomeException -> IO (Results a)
forall e a. Exception e => e -> IO a
throwIO SomeException
exc
SchedulerStatus
SchedulerIdle ->
IO (Results a) -> IO (Results a)
forall a. IO a -> IO a
run (IO (Results a) -> IO (Results a))
-> IO (Results a) -> IO (Results a)
forall a b. (a -> b) -> a -> b
$ do
Maybe (Early a)
mEarly <- ST (PrimState IO) (Maybe (Early a)) -> IO (Maybe (Early a))
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (ST (PrimState IO) (Maybe (Early a)) -> IO (Maybe (Early a)))
-> ST (PrimState IO) (Maybe (Early a)) -> IO (Maybe (Early a))
forall a b. (a -> b) -> a -> b
$ Scheduler RealWorld a -> ST RealWorld (Maybe (Early a))
forall s a. Scheduler s a -> ST s (Maybe (Early a))
_batchEarly Scheduler RealWorld a
scheduler
Maybe (Early a) -> IO [a] -> IO (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly (JQueue IO a -> IO [a]
collect JQueue IO a
jobsQueue)
{-# INLINEABLE withSchedulerInternal #-}
collectResults :: Applicative f => Maybe (Early a) -> f [a] -> f (Results a)
collectResults :: forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly f [a]
collect =
case Maybe (Early a)
mEarly of
Maybe (Early a)
Nothing -> [a] -> Results a
forall a. [a] -> Results a
Finished ([a] -> Results a) -> f [a] -> f (Results a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> f [a]
collect
Just (Early a
r) -> [a] -> a -> Results a
forall a. [a] -> a -> Results a
FinishedEarly ([a] -> a -> Results a) -> f [a] -> f (a -> Results a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> f [a]
collect f (a -> Results a) -> f a -> f (Results a)
forall a b. f (a -> b) -> f a -> f b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> a -> f a
forall a. a -> f a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
r
Just (EarlyWith a
r) -> Results a -> f (Results a)
forall a. a -> f a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Results a -> f (Results a)) -> Results a -> f (Results a)
forall a b. (a -> b) -> a -> b
$ a -> Results a
forall a. a -> Results a
FinishedEarlyWith a
r
{-# INLINEABLE collectResults #-}
spawnWorkers :: forall m a. MonadUnliftIO m => Jobs m a -> Comp -> m [ThreadId]
spawnWorkers :: forall (m :: * -> *) a.
MonadUnliftIO m =>
Jobs m a -> Comp -> m [ThreadId]
spawnWorkers jobs :: Jobs m a
jobs@Jobs {Int
jobsNumWorkers :: forall (m :: * -> *) a. Jobs m a -> Int
jobsNumWorkers :: Int
jobsNumWorkers} =
\case
Comp
Par -> MonadUnliftIO m =>
(Int -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
(Int -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
spawnWorkersWith Int -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkOnWithUnmask [Int
1 .. Int
jobsNumWorkers]
ParOn [Int]
ws -> MonadUnliftIO m =>
(Int -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
(Int -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
spawnWorkersWith Int -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkOnWithUnmask [Int]
ws
ParN Word16
_ -> MonadUnliftIO m =>
(Int -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
(Int -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
spawnWorkersWith (\Int
_ -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkIOWithUnmask) [Int
1 .. Int
jobsNumWorkers]
Comp
Seq -> MonadUnliftIO m =>
(Int -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
(Int -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
spawnWorkersWith (\Int
_ -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkIOWithUnmask) [Int
1 :: Int]
where
spawnWorkersWith ::
MonadUnliftIO m
=> (Int -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId)
-> [Int]
-> m [ThreadId]
spawnWorkersWith :: MonadUnliftIO m =>
(Int -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
spawnWorkersWith Int -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
fork [Int]
ws =
((forall a. m a -> IO a) -> IO [ThreadId]) -> m [ThreadId]
forall b. ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO [ThreadId]) -> m [ThreadId])
-> ((forall a. m a -> IO a) -> IO [ThreadId]) -> m [ThreadId]
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run ->
[(WorkerId, Int)]
-> ((WorkerId, Int) -> IO ThreadId) -> IO [ThreadId]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM ([WorkerId] -> [Int] -> [(WorkerId, Int)]
forall a b. [a] -> [b] -> [(a, b)]
zip [WorkerId
0 ..] [Int]
ws) (((WorkerId, Int) -> IO ThreadId) -> IO [ThreadId])
-> ((WorkerId, Int) -> IO ThreadId) -> IO [ThreadId]
forall a b. (a -> b) -> a -> b
$ \(WorkerId
wId, Int
on) ->
Int -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
fork Int
on (((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask -> (forall a. m a -> IO a)
-> (forall a. IO a -> IO a) -> WorkerId -> Jobs m a -> IO ()
forall (m :: * -> *) a.
MonadUnliftIO m =>
(forall b. m b -> IO b)
-> (forall a. IO a -> IO a) -> WorkerId -> Jobs m a -> IO ()
runWorker m b -> IO b
forall a. m a -> IO a
run IO c -> IO c
forall a. IO a -> IO a
unmask WorkerId
wId Jobs m a
jobs
{-# INLINEABLE spawnWorkers #-}
terminateWorkers :: [ThreadId] -> IO ()
terminateWorkers :: [ThreadId] -> IO ()
terminateWorkers = (ThreadId -> IO ()) -> [ThreadId] -> IO ()
forall (f :: * -> *) (t :: * -> *) a.
(Applicative f, Foldable t) =>
(a -> f ()) -> t a -> f ()
traverse_ (ThreadId -> SomeAsyncException -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
`throwTo` WorkerTerminateException -> SomeAsyncException
forall e. Exception e => e -> SomeAsyncException
SomeAsyncException WorkerTerminateException
WorkerTerminateException)
resultsToList :: Results a -> [a]
resultsToList :: forall a. Results a -> [a]
resultsToList = \case
Finished [a]
rs -> [a]
rs
FinishedEarly [a]
rs a
r -> a
ra -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
rs
FinishedEarlyWith a
r -> [a
r]
{-# INLINEABLE resultsToList #-}
reverseResults :: Results a -> Results a
reverseResults :: forall a. Results a -> Results a
reverseResults = \case
Finished [a]
rs -> [a] -> Results a
forall a. [a] -> Results a
Finished ([a] -> [a]
forall a. [a] -> [a]
reverse [a]
rs)
FinishedEarly [a]
rs a
r -> [a] -> a -> Results a
forall a. [a] -> a -> Results a
FinishedEarly ([a] -> [a]
forall a. [a] -> [a]
reverse [a]
rs) a
r
Results a
res -> Results a
res
{-# INLINEABLE reverseResults #-}
isSyncException :: Exception e => e -> Bool
isSyncException :: forall e. Exception e => e -> Bool
isSyncException e
exc =
case SomeException -> Maybe SomeAsyncException
forall e. Exception e => SomeException -> Maybe e
fromException (e -> SomeException
forall e. Exception e => e -> SomeException
toException e
exc) of
Just (SomeAsyncException e
_) -> Bool
False
Maybe SomeAsyncException
Nothing -> Bool
True
safeBracketOnError :: MonadUnliftIO m => m a -> (a -> m b) -> (a -> m c) -> m c
safeBracketOnError :: forall (m :: * -> *) a b c.
MonadUnliftIO m =>
m a -> (a -> m b) -> (a -> m c) -> m c
safeBracketOnError m a
before a -> m b
after a -> m c
thing = ((forall a. m a -> IO a) -> IO c) -> m c
forall b. ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO c) -> m c)
-> ((forall a. m a -> IO a) -> IO c) -> m c
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> ((forall a. IO a -> IO a) -> IO c) -> IO c
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO c) -> IO c)
-> ((forall a. IO a -> IO a) -> IO c) -> IO c
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
a
x <- m a -> IO a
forall a. m a -> IO a
run m a
before
Either SomeException c
res1 <- IO c -> IO (Either SomeException c)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO c -> IO (Either SomeException c))
-> IO c -> IO (Either SomeException c)
forall a b. (a -> b) -> a -> b
$ IO c -> IO c
forall a. IO a -> IO a
restore (IO c -> IO c) -> IO c -> IO c
forall a b. (a -> b) -> a -> b
$ m c -> IO c
forall a. m a -> IO a
run (m c -> IO c) -> m c -> IO c
forall a b. (a -> b) -> a -> b
$ a -> m c
thing a
x
case Either SomeException c
res1 of
Left (SomeException
e1 :: SomeException) -> do
Either SomeException b
_ :: Either SomeException b <-
IO b -> IO (Either SomeException b)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO b -> IO (Either SomeException b))
-> IO b -> IO (Either SomeException b)
forall a b. (a -> b) -> a -> b
$ IO b -> IO b
forall a. IO a -> IO a
uninterruptibleMask_ (IO b -> IO b) -> IO b -> IO b
forall a b. (a -> b) -> a -> b
$ m b -> IO b
forall a. m a -> IO a
run (m b -> IO b) -> m b -> IO b
forall a b. (a -> b) -> a -> b
$ a -> m b
after a
x
SomeException -> IO c
forall e a. Exception e => e -> IO a
throwIO SomeException
e1
Right c
y -> c -> IO c
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return c
y