module Unison.Util.TQueue where

import Control.Concurrent.Async qualified as Async
import Data.Sequence (Seq ((:<|)), (|>))
import Data.Sequence qualified as S
import Unison.Prelude
import UnliftIO.STM hiding (TQueue)

data TQueue a = TQueue (TVar (Seq a)) (TVar Word64)

newIO :: forall a m. (MonadIO m) => m (TQueue a)
newIO :: forall a (m :: * -> *). MonadIO m => m (TQueue a)
newIO = TVar (Seq a) -> TVar Word64 -> TQueue a
forall a. TVar (Seq a) -> TVar Word64 -> TQueue a
TQueue (TVar (Seq a) -> TVar Word64 -> TQueue a)
-> m (TVar (Seq a)) -> m (TVar Word64 -> TQueue a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Seq a -> m (TVar (Seq a))
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Seq a
forall a. Monoid a => a
mempty m (TVar Word64 -> TQueue a) -> m (TVar Word64) -> m (TQueue a)
forall a b. m (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Word64 -> m (TVar Word64)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Word64
0

size :: TQueue a -> STM Int
size :: forall a. TQueue a -> STM Int
size (TQueue TVar (Seq a)
q TVar Word64
_) = Seq a -> Int
forall a. Seq a -> Int
S.length (Seq a -> Int) -> STM (Seq a) -> STM Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar (Seq a) -> STM (Seq a)
forall a. TVar a -> STM a
readTVar TVar (Seq a)
q

-- Waits for this queue to reach a size <= target.
-- Consumes no elements; it's expected there is some
-- other thread which is consuming elements from the queue.
awaitSize :: Int -> TQueue a -> STM ()
awaitSize :: forall a. Int -> TQueue a -> STM ()
awaitSize Int
target TQueue a
q =
  TQueue a -> STM Int
forall a. TQueue a -> STM Int
size TQueue a
q STM Int -> (Int -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Int
n ->
    if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
target
      then () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      else STM ()
forall a. STM a
retrySTM

peek :: TQueue a -> STM a
peek :: forall a. TQueue a -> STM a
peek (TQueue TVar (Seq a)
v TVar Word64
_) =
  TVar (Seq a) -> STM (Seq a)
forall a. TVar a -> STM a
readTVar TVar (Seq a)
v STM (Seq a) -> (Seq a -> STM a) -> STM a
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    a
a :<| Seq a
_ -> a -> STM a
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
a
    Seq a
_ -> STM a
forall a. STM a
retrySTM

dequeue :: TQueue a -> STM a
dequeue :: forall a. TQueue a -> STM a
dequeue (TQueue TVar (Seq a)
v TVar Word64
_) =
  TVar (Seq a) -> STM (Seq a)
forall a. TVar a -> STM a
readTVar TVar (Seq a)
v STM (Seq a) -> (Seq a -> STM a) -> STM a
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    a
a :<| Seq a
as -> TVar (Seq a) -> Seq a -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Seq a)
v Seq a
as STM () -> STM a -> STM a
forall a b. STM a -> STM b -> STM b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> a -> STM a
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
a
    Seq a
_ -> STM a
forall a. STM a
retrySTM

undequeue :: TQueue a -> a -> STM ()
undequeue :: forall a. TQueue a -> a -> STM ()
undequeue (TQueue TVar (Seq a)
v TVar Word64
_) a
a =
  TVar (Seq a) -> STM (Seq a)
forall a. TVar a -> STM a
readTVar TVar (Seq a)
v STM (Seq a) -> (Seq a -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Seq a
as -> TVar (Seq a) -> Seq a -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Seq a)
v (a
a a -> Seq a -> Seq a
forall a. a -> Seq a -> Seq a
:<| Seq a
as)

tryDequeue :: TQueue a -> STM (Maybe a)
tryDequeue :: forall a. TQueue a -> STM (Maybe a)
tryDequeue (TQueue TVar (Seq a)
v TVar Word64
_) =
  TVar (Seq a) -> STM (Seq a)
forall a. TVar a -> STM a
readTVar TVar (Seq a)
v STM (Seq a) -> (Seq a -> STM (Maybe a)) -> STM (Maybe a)
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    a
a :<| Seq a
as -> TVar (Seq a) -> Seq a -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Seq a)
v Seq a
as STM () -> STM (Maybe a) -> STM (Maybe a)
forall a b. STM a -> STM b -> STM b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Maybe a -> STM (Maybe a)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
    Seq a
_ -> Maybe a -> STM (Maybe a)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
forall a. Maybe a
Nothing

dequeueN :: TQueue a -> Int -> STM [a]
dequeueN :: forall a. TQueue a -> Int -> STM [a]
dequeueN (TQueue TVar (Seq a)
v TVar Word64
_) Int
n =
  TVar (Seq a) -> STM (Seq a)
forall a. TVar a -> STM a
readTVar TVar (Seq a)
v STM (Seq a) -> (Seq a -> STM [a]) -> STM [a]
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Seq a
s ->
    if Seq a -> Int
forall a. Seq a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length Seq a
s Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n
      then TVar (Seq a) -> Seq a -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Seq a)
v (Int -> Seq a -> Seq a
forall a. Int -> Seq a -> Seq a
S.drop Int
n Seq a
s) STM () -> [a] -> STM [a]
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Seq a -> [a]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Int -> Seq a -> Seq a
forall a. Int -> Seq a -> Seq a
S.take Int
n Seq a
s)
      else STM [a]
forall a. STM a
retrySTM

-- return the number of enqueues over the life of the queue
enqueueCount :: TQueue a -> STM Word64
enqueueCount :: forall a. TQueue a -> STM Word64
enqueueCount (TQueue TVar (Seq a)
_ TVar Word64
count) = TVar Word64 -> STM Word64
forall a. TVar a -> STM a
readTVar TVar Word64
count

flush :: TQueue a -> STM [a]
flush :: forall a. TQueue a -> STM [a]
flush (TQueue TVar (Seq a)
v TVar Word64
_) = do
  Seq a
s <- TVar (Seq a) -> STM (Seq a)
forall a. TVar a -> STM a
readTVar TVar (Seq a)
v
  TVar (Seq a) -> Seq a -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Seq a)
v Seq a
forall a. Monoid a => a
mempty
  [a] -> STM [a]
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([a] -> STM [a]) -> (Seq a -> [a]) -> Seq a -> STM [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Seq a -> [a]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq a -> STM [a]) -> Seq a -> STM [a]
forall a b. (a -> b) -> a -> b
$ Seq a
s

enqueue :: TQueue a -> a -> STM ()
enqueue :: forall a. TQueue a -> a -> STM ()
enqueue (TQueue TVar (Seq a)
v TVar Word64
count) a
a = do
  TVar (Seq a) -> (Seq a -> Seq a) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Seq a)
v (Seq a -> a -> Seq a
forall a. Seq a -> a -> Seq a
|> a
a)
  TVar Word64 -> (Word64 -> Word64) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Word64
count (Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ Word64
1)

raceIO :: (MonadIO m) => STM a -> STM b -> m (Either a b)
raceIO :: forall (m :: * -> *) a b.
MonadIO m =>
STM a -> STM b -> m (Either a b)
raceIO STM a
a STM b
b = IO (Either a b) -> m (Either a b)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO do
  Async a
aa <- IO a -> IO (Async a)
forall a. IO a -> IO (Async a)
Async.async (IO a -> IO (Async a)) -> IO a -> IO (Async a)
forall a b. (a -> b) -> a -> b
$ STM a -> IO a
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically STM a
a
  Async b
ab <- IO b -> IO (Async b)
forall a. IO a -> IO (Async a)
Async.async (IO b -> IO (Async b)) -> IO b -> IO (Async b)
forall a b. (a -> b) -> a -> b
$ STM b -> IO b
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically STM b
b
  Async a -> Async b -> IO (Either a b)
forall a b. Async a -> Async b -> IO (Either a b)
Async.waitEitherCancel Async a
aa Async b
ab

-- take all elements up to but not including the first not satisfying cond
tryPeekWhile :: (a -> Bool) -> TQueue a -> STM [a]
tryPeekWhile :: forall a. (a -> Bool) -> TQueue a -> STM [a]
tryPeekWhile a -> Bool
cond (TQueue TVar (Seq a)
v TVar Word64
_) = Seq a -> [a]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq a -> [a]) -> (Seq a -> Seq a) -> Seq a -> [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> Bool) -> Seq a -> Seq a
forall a. (a -> Bool) -> Seq a -> Seq a
S.takeWhileL a -> Bool
cond (Seq a -> [a]) -> STM (Seq a) -> STM [a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar (Seq a) -> STM (Seq a)
forall a. TVar a -> STM a
readTVar TVar (Seq a)
v

-- block until at least one element is enqueued not satisfying cond,
-- then return the prefix before that
takeWhile :: (a -> Bool) -> TQueue a -> STM [a]
takeWhile :: forall a. (a -> Bool) -> TQueue a -> STM [a]
takeWhile a -> Bool
cond (TQueue TVar (Seq a)
v TVar Word64
_) =
  TVar (Seq a) -> STM (Seq a)
forall a. TVar a -> STM a
readTVar TVar (Seq a)
v STM (Seq a) -> (Seq a -> STM [a]) -> STM [a]
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Seq a
s ->
    let (Seq a
left, Seq a
right) = (a -> Bool) -> Seq a -> (Seq a, Seq a)
forall a. (a -> Bool) -> Seq a -> (Seq a, Seq a)
S.spanl a -> Bool
cond Seq a
s
     in if Seq a -> Bool
forall a. Seq a -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null Seq a
right
          then STM [a]
forall a. STM a
retrySTM
          else TVar (Seq a) -> Seq a -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Seq a)
v Seq a
right STM () -> [a] -> STM [a]
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Seq a -> [a]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList Seq a
left

peekWhile :: (a -> Bool) -> TQueue a -> STM [a]
peekWhile :: forall a. (a -> Bool) -> TQueue a -> STM [a]
peekWhile a -> Bool
cond (TQueue TVar (Seq a)
v TVar Word64
_) =
  TVar (Seq a) -> STM (Seq a)
forall a. TVar a -> STM a
readTVar TVar (Seq a)
v STM (Seq a) -> (Seq a -> STM [a]) -> STM [a]
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Seq a
s ->
    let (Seq a
left, Seq a
right) = (a -> Bool) -> Seq a -> (Seq a, Seq a)
forall a. (a -> Bool) -> Seq a -> (Seq a, Seq a)
S.spanl a -> Bool
cond Seq a
s
     in if Seq a -> Bool
forall a. Seq a -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null Seq a
right
          then STM [a]
forall a. STM a
retrySTM
          else [a] -> STM [a]
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([a] -> STM [a]) -> [a] -> STM [a]
forall a b. (a -> b) -> a -> b
$ Seq a -> [a]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList Seq a
left