Copyright | (c) Alexey Kuleshevich 2018-2021 |
---|---|
License | BSD3 |
Maintainer | Alexey Kuleshevich <lehins@yandex.ru> |
Stability | experimental |
Portability | non-portable |
Safe Haskell | Safe-Inferred |
Language | Haskell2010 |
Synopsis
- data Scheduler s a
- data SchedulerWS ws a
- data Results a
- = Finished [a]
- | FinishedEarly [a] !a
- | FinishedEarlyWith !a
- withScheduler :: MonadUnliftIO m => Comp -> (Scheduler RealWorld a -> m b) -> m [a]
- withScheduler_ :: MonadUnliftIO m => Comp -> (Scheduler RealWorld a -> m b) -> m ()
- withSchedulerR :: MonadUnliftIO m => Comp -> (Scheduler RealWorld a -> m b) -> m (Results a)
- withSchedulerWS :: MonadUnliftIO m => WorkerStates ws -> (SchedulerWS ws a -> m b) -> m [a]
- withSchedulerWS_ :: MonadUnliftIO m => WorkerStates ws -> (SchedulerWS ws () -> m b) -> m ()
- withSchedulerWSR :: MonadUnliftIO m => WorkerStates ws -> (SchedulerWS ws a -> m b) -> m (Results a)
- unwrapSchedulerWS :: SchedulerWS ws a -> Scheduler RealWorld a
- trivialScheduler_ :: Scheduler s ()
- withTrivialScheduler :: MonadPrim s m => (Scheduler s a -> m b) -> m [a]
- withTrivialSchedulerR :: forall a b m s. MonadPrim s m => (Scheduler s a -> m b) -> m (Results a)
- scheduleWork :: MonadPrimBase s m => Scheduler s a -> m a -> m ()
- scheduleWork_ :: MonadPrimBase s m => Scheduler s () -> m () -> m ()
- scheduleWorkId :: MonadPrimBase s m => Scheduler s a -> (WorkerId -> m a) -> m ()
- scheduleWorkId_ :: MonadPrimBase s m => Scheduler s () -> (WorkerId -> m ()) -> m ()
- scheduleWorkState :: MonadPrimBase RealWorld m => SchedulerWS ws a -> (ws -> m a) -> m ()
- scheduleWorkState_ :: MonadPrimBase RealWorld m => SchedulerWS ws () -> (ws -> m ()) -> m ()
- replicateWork :: MonadPrimBase s m => Scheduler s a -> Int -> m a -> m ()
- replicateWork_ :: MonadPrimBase s m => Scheduler s () -> Int -> m a -> m ()
- data Batch s a
- runBatch :: MonadPrimBase s m => Scheduler s a -> (Batch s a -> m c) -> m [a]
- runBatch_ :: MonadPrimBase s m => Scheduler s () -> (Batch s () -> m c) -> m ()
- runBatchR :: MonadPrimBase s m => Scheduler s a -> (Batch s a -> m c) -> m (Results a)
- cancelBatch :: MonadPrim s m => Batch s a -> a -> m Bool
- cancelBatch_ :: MonadPrim s m => Batch s () -> m Bool
- cancelBatchWith :: MonadPrim s m => Batch s a -> a -> m Bool
- hasBatchFinished :: MonadPrim s m => Batch s a -> m Bool
- getCurrentBatch :: MonadPrim s m => Scheduler s a -> m (Batch s a)
- terminate :: MonadPrim s m => Scheduler s a -> a -> m a
- terminate_ :: MonadPrim s m => Scheduler s () -> m ()
- terminateWith :: MonadPrim s m => Scheduler s a -> a -> m a
- newtype WorkerId = WorkerId {
- getWorkerId :: Int
- data WorkerStates ws
- numWorkers :: Scheduler s a -> Int
- workerStatesComp :: WorkerStates ws -> Comp
- initWorkerStates :: MonadIO m => Comp -> (WorkerId -> m ws) -> m (WorkerStates ws)
- data Comp where
- getCompWorkers :: MonadIO m => Comp -> m Int
- replicateConcurrently :: MonadUnliftIO m => Comp -> Int -> m a -> m [a]
- replicateConcurrently_ :: MonadUnliftIO m => Comp -> Int -> m a -> m ()
- traverseConcurrently :: (MonadUnliftIO m, Traversable t) => Comp -> (a -> m b) -> t a -> m (t b)
- traverseConcurrently_ :: (MonadUnliftIO m, Foldable t) => Comp -> (a -> m b) -> t a -> m ()
- traverse_ :: (Applicative f, Foldable t) => (a -> f ()) -> t a -> f ()
- data MutexException = MutexException
Scheduler
Main type for scheduling work. See withScheduler
or
withScheduler_
for ways to construct and use this data type.
Since: 1.0.0
data SchedulerWS ws a Source #
This is a wrapper around Scheduler
, but it also keeps a separate state for each
individual worker. See withSchedulerWS
or
withSchedulerWS_
for ways to construct and use this data type.
Since: 1.4.0
Computed results of scheduled jobs.
Since: 1.4.2
Finished [a] | Finished normally with all scheduled jobs completed |
FinishedEarly [a] !a | Finished early by the means of |
FinishedEarlyWith !a | Finished early by the means of |
Instances
Foldable Results Source # | |
Defined in Control.Scheduler.Types fold :: Monoid m => Results m -> m # foldMap :: Monoid m => (a -> m) -> Results a -> m # foldMap' :: Monoid m => (a -> m) -> Results a -> m # foldr :: (a -> b -> b) -> b -> Results a -> b # foldr' :: (a -> b -> b) -> b -> Results a -> b # foldl :: (b -> a -> b) -> b -> Results a -> b # foldl' :: (b -> a -> b) -> b -> Results a -> b # foldr1 :: (a -> a -> a) -> Results a -> a # foldl1 :: (a -> a -> a) -> Results a -> a # elem :: Eq a => a -> Results a -> Bool # maximum :: Ord a => Results a -> a # minimum :: Ord a => Results a -> a # | |
Traversable Results Source # | |
Functor Results Source # | |
Read a => Read (Results a) Source # | |
Show a => Show (Results a) Source # | |
Eq a => Eq (Results a) Source # | |
Regular
:: MonadUnliftIO m | |
=> Comp | Computation strategy |
-> (Scheduler RealWorld a -> m b) | Action that will be scheduling all the work. |
-> m [a] |
Initialize a scheduler and submit jobs that will be computed sequentially or in parallelel,
which is determined by the Comp
utation strategy.
Here are some cool properties about the withScheduler
:
- This function will block until all of the submitted jobs have finished or at least one of them resulted in an exception, which will be re-thrown at the callsite.
- It is totally fine for nested jobs to submit more jobs for the same or other scheduler
- It is ok to initialize multiple schedulers at the same time, although that will likely result in suboptimal performance, unless workers are pinned to different capabilities.
- Warning It is pretty dangerous to schedule jobs that can block, because it might lead to a potential deadlock, if you are not careful. Consider this example. First execution works fine, since there are two scheduled workers, and one can unblock the other, but the second scenario immediately results in a deadlock.
>>>
withScheduler (ParOn [1,2]) $ \s -> newEmptyMVar >>= (\ mv -> scheduleWork s (readMVar mv) >> scheduleWork s (putMVar mv ()))
[(),()]>>>
import System.Timeout
>>>
timeout 1000000 $ withScheduler (ParOn [1]) $ \s -> newEmptyMVar >>= (\ mv -> scheduleWork s (readMVar mv) >> scheduleWork s (putMVar mv ()))
Nothing
Important: In order to get work done truly in parallel, program needs to be compiled with
-threaded
GHC flag and executed with +RTS -N -RTS
to use all available cores.
Since: 1.0.0
:: MonadUnliftIO m | |
=> Comp | Computation strategy |
-> (Scheduler RealWorld a -> m b) | Action that will be scheduling all the work. |
-> m () |
Same as withScheduler
, but discards results of submitted jobs.
Since: 1.0.0
:: MonadUnliftIO m | |
=> Comp | Computation strategy |
-> (Scheduler RealWorld a -> m b) | Action that will be scheduling all the work. |
-> m (Results a) |
Same as withScheduler
, except instead of a list it produces Results
, which allows
for distinguishing between the ways computation was terminated.
Since: 1.4.2
Stateful workers
withSchedulerWS :: MonadUnliftIO m => WorkerStates ws -> (SchedulerWS ws a -> m b) -> m [a] Source #
Run a scheduler with stateful workers. Throws MutexException
if an attempt is made
to concurrently use the same WorkerStates
with another SchedulerWS
.
Examples
A good example of using stateful workers would be generation of random number in parallel. A lof of times random number generators are not thread safe, so we can work around this problem with a separate stateful generator for each of the workers.
>>>
import Control.Monad as M ((>=>), replicateM)
>>>
import Control.Concurrent (yield, threadDelay)
>>>
import Data.List (sort)
>>>
-- ^ Above imports are used to make sure output is deterministic, which is needed for doctest
>>>
import System.Random.MWC as MWC
>>>
import Data.Vector.Unboxed as V (singleton)
>>>
states <- initWorkerStates (ParN 4) (MWC.initialize . V.singleton . fromIntegral . getWorkerId)
>>>
let scheduleGen scheduler = scheduleWorkState scheduler (MWC.uniform >=> \r -> yield >> threadDelay 200000 >> pure r)
>>>
sort <$> withSchedulerWS states (M.replicateM 4 . scheduleGen) :: IO [Double]
[0.21734983682025255,0.5000843862105709,0.5759825622603018,0.8587171114177893]>>>
sort <$> withSchedulerWS states (M.replicateM 4 . scheduleGen) :: IO [Double]
[2.3598617298033475e-2,9.949679290089553e-2,0.38223134248645885,0.7408640677124702]
In the above example we use four different random number generators from
`mwc-random` in order to generate 4
numbers, all in separate threads. The subsequent call to the withSchedulerWS
function
with the same states
is allowed to reuse the same generators, thus avoiding expensive
initialization.
Side note - The example presented was crafted with slight trickery in order to guarantee that the output is deterministic, so if you run instructions exactly the same way in GHCI you will get the exact same output. Non-determinism comes from thread scheduling, rather than from random number generator, because we use exactly the same seed for each worker, but workers run concurrently. Exact output is not really needed, except for the doctests to pass.
Since: 1.4.0
withSchedulerWS_ :: MonadUnliftIO m => WorkerStates ws -> (SchedulerWS ws () -> m b) -> m () Source #
Run a scheduler with stateful workers, while discarding computation results.
Since: 1.4.0
withSchedulerWSR :: MonadUnliftIO m => WorkerStates ws -> (SchedulerWS ws a -> m b) -> m (Results a) Source #
Same as withSchedulerWS
, except instead of a list it produces Results
, which
allows for distinguishing between the ways computation was terminated.
Since: 1.4.2
unwrapSchedulerWS :: SchedulerWS ws a -> Scheduler RealWorld a Source #
Get the underlying Scheduler
, which cannot access WorkerStates
.
Since: 1.4.0
Trivial (no parallelism)
trivialScheduler_ :: Scheduler s () Source #
The most basic scheduler that simply runs the task instead of scheduling it. Early termination requests are bluntly ignored.
Since: 1.1.0
withTrivialScheduler :: MonadPrim s m => (Scheduler s a -> m b) -> m [a] Source #
This trivial scheduler will behave in the same way as withScheduler
with Seq
computation strategy, except it is restricted to PrimMonad
, instead of MonadUnliftIO
.
Since: 1.4.2
withTrivialSchedulerR :: forall a b m s. MonadPrim s m => (Scheduler s a -> m b) -> m (Results a) Source #
This trivial scheduler will behave in a similar way as
withSchedulerR
with Seq
computation strategy, except it is
restricted to PrimMonad
, instead of MonadUnliftIO
and the work isn't scheduled, but
rather computed immediately.
Since: 1.4.2
Scheduling computation
scheduleWork :: MonadPrimBase s m => Scheduler s a -> m a -> m () Source #
Schedule an action to be picked up and computed by a worker from a pool of
jobs. Similar to scheduleWorkId
, except the job doesn't get the worker id.
Since: 1.0.0
scheduleWork_ :: MonadPrimBase s m => Scheduler s () -> m () -> m () Source #
Same as scheduleWork
, but only for a Scheduler
that doesn't keep the results.
Since: 1.1.0
scheduleWorkId :: MonadPrimBase s m => Scheduler s a -> (WorkerId -> m a) -> m () Source #
Schedule an action to be picked up and computed by a worker from a pool of
jobs. Argument supplied to the job will be the id of the worker doing the job. This is
useful for identification of a thread that will be doing the work, since there is
one-to-one mapping from ThreadId
to WorkerId
for a particular
scheduler.
Since: 1.2.0
scheduleWorkId_ :: MonadPrimBase s m => Scheduler s () -> (WorkerId -> m ()) -> m () Source #
Same as scheduleWorkId
, but only for a Scheduler
that doesn't keep the results.
Since: 1.2.0
scheduleWorkState :: MonadPrimBase RealWorld m => SchedulerWS ws a -> (ws -> m a) -> m () Source #
Schedule a job that will get a worker state passed as an argument
Since: 1.4.0
scheduleWorkState_ :: MonadPrimBase RealWorld m => SchedulerWS ws () -> (ws -> m ()) -> m () Source #
Same as scheduleWorkState
, but dont' keep the result of computation.
Since: 1.4.0
replicateWork :: MonadPrimBase s m => Scheduler s a -> Int -> m a -> m () Source #
Schedule the same action to run n
times concurrently. This differs from
replicateConcurrently
by allowing the caller to use the Scheduler
freely,
or to allow early termination via terminate
across all (identical) threads.
To be called within a withScheduler
block.
Since: 2.0.0
replicateWork_ :: MonadPrimBase s m => Scheduler s () -> Int -> m a -> m () Source #
Same as replicateWork
, but it does not retain the results of scheduled jobs
Since: 2.0.0
Batches
Batch is an artifical checkpoint that can be controlled by the user throughout the lifetime of a scheduler.
Since: 1.5.0
runBatch :: MonadPrimBase s m => Scheduler s a -> (Batch s a -> m c) -> m [a] Source #
Run a single batch of jobs. Supplied action will not return until all jobs placed on
the queue are done or the whole batch is cancelled with one of these cancelBatch
,
cancelBatch_
or cancelBatchWith
.
It waits for all scheduled jobs to finish and collects the computed results into a list. It is a blocking operation, but if there are no jobs in progress it will return immediately. It is safe to continue using the supplied scheduler after this function returns. However, if any of the jobs resulted in an exception it will be rethrown by this function, which, unless caught, will further put the scheduler in a terminated state.
It is important to note that any job that hasn't had its results collected from the scheduler prior to starting the batch it will end up on the batch result list.
Since: 1.5.0
runBatch_ :: MonadPrimBase s m => Scheduler s () -> (Batch s () -> m c) -> m () Source #
Same as runBatch
, except it ignores results of computation
Since: 1.5.0
cancelBatch :: MonadPrim s m => Batch s a -> a -> m Bool Source #
Cancel batch with supplied identifier, which will lead to scheduler to return
FinishedEarly
result. This is an idempotent operation and has no affect if currently
running batch does not match supplied identifier. Returns False
when cancelling did
not succeed due to mismatched identifier or does not return at all since all jobs get
cancelled immediately. For trivial schedulers however there is no way to perform
concurrent cancelation and it will return True
.
Since: 1.5.0
cancelBatch_ :: MonadPrim s m => Batch s () -> m Bool Source #
Same as cancelBatch
, but only works with schedulers that don't care about results
Since: 1.5.0
cancelBatchWith :: MonadPrim s m => Batch s a -> a -> m Bool Source #
Same as cancelBatch_
, but the result of computation will be set to FinishedEarlyWith
Since: 1.5.0
hasBatchFinished :: MonadPrim s m => Batch s a -> m Bool Source #
Check if the supplied batch has already finished.
Since: 1.5.0
getCurrentBatch :: MonadPrim s m => Scheduler s a -> m (Batch s a) Source #
This function gives a way to get access to the main batch that started implicitely.
Since: 1.5.0
Early termination
terminate :: MonadPrim s m => Scheduler s a -> a -> m a Source #
As soon as possible try to terminate any computation that is being performed by all
workers managed by this scheduler and collect whatever results have been computed, with
supplied element guaranteed to being the last one. In case when Results
type is
returned this function will cause the scheduler to produce FinishedEarly
Important - With Seq
strategy this will not stop other scheduled tasks from being computed,
although it will make sure their results are discarded.
Since: 1.1.0
terminate_ :: MonadPrim s m => Scheduler s () -> m () Source #
terminateWith :: MonadPrim s m => Scheduler s a -> a -> m a Source #
Same as terminate
, but returning a single element list containing the supplied
argument. This can be very useful for parallel search algorithms. In case when
Results
is the return type this function will cause the scheduler to produce
FinishedEarlyWith
Important - Same as with terminate
, when Seq
strategy is used, this will not prevent
computation from continuing, but the scheduler will return only the result supplied to this
function.
Since: 1.1.0
Workers
A unique id for the worker in the Scheduler
context. It will
always be a number from 0
up to, but not including, the number of workers a scheduler
has, which in turn can always be determined with numWorkers
function.
Since: 1.4.0
Instances
Bounded WorkerId Source # | |
Enum WorkerId Source # | |
Num WorkerId Source # | |
Read WorkerId Source # | |
Integral WorkerId Source # | |
Defined in Control.Scheduler.Queue | |
Real WorkerId Source # | |
Defined in Control.Scheduler.Queue toRational :: WorkerId -> Rational # | |
Show WorkerId Source # | |
Eq WorkerId Source # | |
Ord WorkerId Source # | |
Defined in Control.Scheduler.Queue |
data WorkerStates ws Source #
Each worker is capable of keeping it's own state, that can be share for different
schedulers, but not at the same time. In other words using the same WorkerStates
on
withSchedulerS
concurrently will result in an error. Can be initialized with
initWorkerStates
Since: 1.4.0
numWorkers :: Scheduler s a -> Int Source #
Get the number of workers. Will mainly depend on the computation strategy and/or number of
capabilities you have. Related function is getCompWorkers
.
Since: 1.0.0
workerStatesComp :: WorkerStates ws -> Comp Source #
Get the computation strategy the states where initialized with.
Since: 1.4.0
initWorkerStates :: MonadIO m => Comp -> (WorkerId -> m ws) -> m (WorkerStates ws) Source #
Initialize a separate state for each worker.
Since: 1.4.0
Computation strategies
Computation strategy to use when scheduling work.
Seq | Sequential computation |
ParOn ![Int] | Schedule workers to run on specific capabilities. Specifying an empty list |
ParN !Word16 | Specify the number of workers that will be handling all the jobs. Difference from |
pattern Par :: Comp | Parallel computation using all available cores. Same as Since: 1.0.0 |
pattern Par' :: Comp | Parallel computation using all available cores. Same as Since: 1.1.0 |
getCompWorkers :: MonadIO m => Comp -> m Int Source #
Figure out how many workers will this computation strategy create.
Note - If at any point during program execution global number of capabilities gets
changed with setNumCapabilities
, it will have no affect on this
function, unless it hasn't yet been called with Par
or Par'
arguments.
Since: 1.1.0
Useful functions
replicateConcurrently :: MonadUnliftIO m => Comp -> Int -> m a -> m [a] Source #
Replicate an action n
times and schedule them acccording to the supplied computation
strategy.
Since: 1.1.0
replicateConcurrently_ :: MonadUnliftIO m => Comp -> Int -> m a -> m () Source #
Just like replicateConcurrently
, but discards the results of computation.
Since: 1.1.0
traverseConcurrently :: (MonadUnliftIO m, Traversable t) => Comp -> (a -> m b) -> t a -> m (t b) Source #
Map an action over each element of the Traversable
t
acccording to the supplied computation
strategy.
Since: 1.0.0
traverseConcurrently_ :: (MonadUnliftIO m, Foldable t) => Comp -> (a -> m b) -> t a -> m () Source #
Just like traverseConcurrently
, but restricted to Foldable
and discards the results of
computation.
Since: 1.0.0
traverse_ :: (Applicative f, Foldable t) => (a -> f ()) -> t a -> f () Source #
This is generally a faster way to traverse while ignoring the result rather than using mapM_
.
Since: 1.0.0
Exceptions
If any one of the workers dies with an exception, even if that exceptions is asynchronous, it will be re-thrown in the scheduling thread.
>>>
let didAWorkerDie = handleJust asyncExceptionFromException (return . (== ThreadKilled)) . fmap or
>>>
:t didAWorkerDie
didAWorkerDie :: Foldable t => IO (t Bool) -> IO Bool>>>
didAWorkerDie $ withScheduler Par $ \ s -> scheduleWork s $ pure False
False>>>
didAWorkerDie $ withScheduler Par $ \ s -> scheduleWork s $ myThreadId >>= killThread >> pure False
True>>>
withScheduler Par $ \ s -> scheduleWork s $ myThreadId >>= killThread >> pure False
*** Exception: thread killed
data MutexException Source #
Exception that gets thrown whenever concurrent access is attempted to the WorkerStates
Since: 1.4.0
Instances
Exception MutexException Source # | |
Defined in Control.Scheduler.Types | |
Show MutexException Source # | |
Defined in Control.Scheduler.Types showsPrec :: Int -> MutexException -> ShowS # show :: MutexException -> String # showList :: [MutexException] -> ShowS # | |
Eq MutexException Source # | |
Defined in Control.Scheduler.Types (==) :: MutexException -> MutexException -> Bool # (/=) :: MutexException -> MutexException -> Bool # |