module Unison.Util.TQueue where
import Control.Concurrent.Async qualified as Async
import Data.Sequence (Seq ((:<|)), (|>))
import Data.Sequence qualified as S
import Unison.Prelude
import UnliftIO.STM hiding (TQueue)
data TQueue a = TQueue (TVar (Seq a)) (TVar Word64)
prepopulatedIO :: forall a m. (MonadIO m) => Seq a -> m (TQueue a)
prepopulatedIO :: forall a (m :: * -> *). MonadIO m => Seq a -> m (TQueue a)
prepopulatedIO Seq a
as = TVar (Seq a) -> TVar Word64 -> TQueue a
forall a. TVar (Seq a) -> TVar Word64 -> TQueue a
TQueue (TVar (Seq a) -> TVar Word64 -> TQueue a)
-> m (TVar (Seq a)) -> m (TVar Word64 -> TQueue a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Seq a -> m (TVar (Seq a))
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Seq a
as m (TVar Word64 -> TQueue a) -> m (TVar Word64) -> m (TQueue a)
forall a b. m (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Word64 -> m (TVar Word64)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO (Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Word64) -> Int -> Word64
forall a b. (a -> b) -> a -> b
$ Seq a -> Int
forall a. Seq a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length Seq a
as)
newIO :: forall a m. (MonadIO m) => m (TQueue a)
newIO :: forall a (m :: * -> *). MonadIO m => m (TQueue a)
newIO = Seq a -> m (TQueue a)
forall a (m :: * -> *). MonadIO m => Seq a -> m (TQueue a)
prepopulatedIO Seq a
forall a. Monoid a => a
mempty
size :: TQueue a -> STM Int
size :: forall a. TQueue a -> STM Int
size (TQueue TVar (Seq a)
q TVar Word64
_) = Seq a -> Int
forall a. Seq a -> Int
S.length (Seq a -> Int) -> STM (Seq a) -> STM Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar (Seq a) -> STM (Seq a)
forall a. TVar a -> STM a
readTVar TVar (Seq a)
q
awaitSize :: Int -> TQueue a -> STM ()
awaitSize :: forall a. Int -> TQueue a -> STM ()
awaitSize Int
target TQueue a
q =
TQueue a -> STM Int
forall a. TQueue a -> STM Int
size TQueue a
q STM Int -> (Int -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Int
n ->
if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
target
then () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
else STM ()
forall a. STM a
retrySTM
peek :: TQueue a -> STM a
peek :: forall a. TQueue a -> STM a
peek (TQueue TVar (Seq a)
v TVar Word64
_) =
TVar (Seq a) -> STM (Seq a)
forall a. TVar a -> STM a
readTVar TVar (Seq a)
v STM (Seq a) -> (Seq a -> STM a) -> STM a
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
a
a :<| Seq a
_ -> a -> STM a
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
a
Seq a
_ -> STM a
forall a. STM a
retrySTM
dequeue :: TQueue a -> STM a
dequeue :: forall a. TQueue a -> STM a
dequeue (TQueue TVar (Seq a)
v TVar Word64
_) =
TVar (Seq a) -> STM (Seq a)
forall a. TVar a -> STM a
readTVar TVar (Seq a)
v STM (Seq a) -> (Seq a -> STM a) -> STM a
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
a
a :<| Seq a
as -> TVar (Seq a) -> Seq a -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Seq a)
v Seq a
as STM () -> STM a -> STM a
forall a b. STM a -> STM b -> STM b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> a -> STM a
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
a
Seq a
_ -> STM a
forall a. STM a
retrySTM
undequeue :: TQueue a -> a -> STM ()
undequeue :: forall a. TQueue a -> a -> STM ()
undequeue (TQueue TVar (Seq a)
v TVar Word64
_) a
a =
TVar (Seq a) -> STM (Seq a)
forall a. TVar a -> STM a
readTVar TVar (Seq a)
v STM (Seq a) -> (Seq a -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Seq a
as -> TVar (Seq a) -> Seq a -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Seq a)
v (a
a a -> Seq a -> Seq a
forall a. a -> Seq a -> Seq a
:<| Seq a
as)
tryDequeue :: TQueue a -> STM (Maybe a)
tryDequeue :: forall a. TQueue a -> STM (Maybe a)
tryDequeue (TQueue TVar (Seq a)
v TVar Word64
_) =
TVar (Seq a) -> STM (Seq a)
forall a. TVar a -> STM a
readTVar TVar (Seq a)
v STM (Seq a) -> (Seq a -> STM (Maybe a)) -> STM (Maybe a)
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
a
a :<| Seq a
as -> TVar (Seq a) -> Seq a -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Seq a)
v Seq a
as STM () -> STM (Maybe a) -> STM (Maybe a)
forall a b. STM a -> STM b -> STM b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Maybe a -> STM (Maybe a)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
Seq a
_ -> Maybe a -> STM (Maybe a)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
forall a. Maybe a
Nothing
dequeueN :: TQueue a -> Int -> STM [a]
dequeueN :: forall a. TQueue a -> Int -> STM [a]
dequeueN (TQueue TVar (Seq a)
v TVar Word64
_) Int
n =
TVar (Seq a) -> STM (Seq a)
forall a. TVar a -> STM a
readTVar TVar (Seq a)
v STM (Seq a) -> (Seq a -> STM [a]) -> STM [a]
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Seq a
s ->
if Seq a -> Int
forall a. Seq a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length Seq a
s Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n
then TVar (Seq a) -> Seq a -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Seq a)
v (Int -> Seq a -> Seq a
forall a. Int -> Seq a -> Seq a
S.drop Int
n Seq a
s) STM () -> [a] -> STM [a]
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Seq a -> [a]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Int -> Seq a -> Seq a
forall a. Int -> Seq a -> Seq a
S.take Int
n Seq a
s)
else STM [a]
forall a. STM a
retrySTM
enqueueCount :: TQueue a -> STM Word64
enqueueCount :: forall a. TQueue a -> STM Word64
enqueueCount (TQueue TVar (Seq a)
_ TVar Word64
count) = TVar Word64 -> STM Word64
forall a. TVar a -> STM a
readTVar TVar Word64
count
flush :: TQueue a -> STM [a]
flush :: forall a. TQueue a -> STM [a]
flush (TQueue TVar (Seq a)
v TVar Word64
_) = do
Seq a
s <- TVar (Seq a) -> STM (Seq a)
forall a. TVar a -> STM a
readTVar TVar (Seq a)
v
TVar (Seq a) -> Seq a -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Seq a)
v Seq a
forall a. Monoid a => a
mempty
[a] -> STM [a]
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([a] -> STM [a]) -> (Seq a -> [a]) -> Seq a -> STM [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Seq a -> [a]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq a -> STM [a]) -> Seq a -> STM [a]
forall a b. (a -> b) -> a -> b
$ Seq a
s
enqueue :: TQueue a -> a -> STM ()
enqueue :: forall a. TQueue a -> a -> STM ()
enqueue (TQueue TVar (Seq a)
v TVar Word64
count) a
a = do
TVar (Seq a) -> (Seq a -> Seq a) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Seq a)
v (Seq a -> a -> Seq a
forall a. Seq a -> a -> Seq a
|> a
a)
TVar Word64 -> (Word64 -> Word64) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Word64
count (Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ Word64
1)
raceIO :: (MonadIO m) => STM a -> STM b -> m (Either a b)
raceIO :: forall (m :: * -> *) a b.
MonadIO m =>
STM a -> STM b -> m (Either a b)
raceIO STM a
a STM b
b = IO (Either a b) -> m (Either a b)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO do
Async a
aa <- IO a -> IO (Async a)
forall a. IO a -> IO (Async a)
Async.async (IO a -> IO (Async a)) -> IO a -> IO (Async a)
forall a b. (a -> b) -> a -> b
$ STM a -> IO a
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically STM a
a
Async b
ab <- IO b -> IO (Async b)
forall a. IO a -> IO (Async a)
Async.async (IO b -> IO (Async b)) -> IO b -> IO (Async b)
forall a b. (a -> b) -> a -> b
$ STM b -> IO b
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically STM b
b
Async a -> Async b -> IO (Either a b)
forall a b. Async a -> Async b -> IO (Either a b)
Async.waitEitherCancel Async a
aa Async b
ab
tryPeekWhile :: (a -> Bool) -> TQueue a -> STM [a]
tryPeekWhile :: forall a. (a -> Bool) -> TQueue a -> STM [a]
tryPeekWhile a -> Bool
cond (TQueue TVar (Seq a)
v TVar Word64
_) = Seq a -> [a]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq a -> [a]) -> (Seq a -> Seq a) -> Seq a -> [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> Bool) -> Seq a -> Seq a
forall a. (a -> Bool) -> Seq a -> Seq a
S.takeWhileL a -> Bool
cond (Seq a -> [a]) -> STM (Seq a) -> STM [a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar (Seq a) -> STM (Seq a)
forall a. TVar a -> STM a
readTVar TVar (Seq a)
v
takeWhile :: (a -> Bool) -> TQueue a -> STM [a]
takeWhile :: forall a. (a -> Bool) -> TQueue a -> STM [a]
takeWhile a -> Bool
cond (TQueue TVar (Seq a)
v TVar Word64
_) =
TVar (Seq a) -> STM (Seq a)
forall a. TVar a -> STM a
readTVar TVar (Seq a)
v STM (Seq a) -> (Seq a -> STM [a]) -> STM [a]
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Seq a
s ->
let (Seq a
left, Seq a
right) = (a -> Bool) -> Seq a -> (Seq a, Seq a)
forall a. (a -> Bool) -> Seq a -> (Seq a, Seq a)
S.spanl a -> Bool
cond Seq a
s
in if Seq a -> Bool
forall a. Seq a -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null Seq a
right
then STM [a]
forall a. STM a
retrySTM
else TVar (Seq a) -> Seq a -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Seq a)
v Seq a
right STM () -> [a] -> STM [a]
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Seq a -> [a]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList Seq a
left
peekWhile :: (a -> Bool) -> TQueue a -> STM [a]
peekWhile :: forall a. (a -> Bool) -> TQueue a -> STM [a]
peekWhile a -> Bool
cond (TQueue TVar (Seq a)
v TVar Word64
_) =
TVar (Seq a) -> STM (Seq a)
forall a. TVar a -> STM a
readTVar TVar (Seq a)
v STM (Seq a) -> (Seq a -> STM [a]) -> STM [a]
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Seq a
s ->
let (Seq a
left, Seq a
right) = (a -> Bool) -> Seq a -> (Seq a, Seq a)
forall a. (a -> Bool) -> Seq a -> (Seq a, Seq a)
S.spanl a -> Bool
cond Seq a
s
in if Seq a -> Bool
forall a. Seq a -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null Seq a
right
then STM [a]
forall a. STM a
retrySTM
else [a] -> STM [a]
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([a] -> STM [a]) -> [a] -> STM [a]
forall a b. (a -> b) -> a -> b
$ Seq a -> [a]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList Seq a
left