{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE TypeApplications #-}

module Unison.Codebase.Watch where

import Control.Concurrent
  ( forkIO,
    killThread,
    threadDelay,
  )
import Control.Concurrent.STM qualified as STM
import Data.Map qualified as Map
import Data.Time.Clock
  ( UTCTime,
    diffUTCTime,
  )
import System.FSNotify (Event (Added, Modified))
import System.FSNotify qualified as FSNotify
import Unison.Prelude
import Unison.Util.TQueue (TQueue)
import Unison.Util.TQueue qualified as TQueue
import UnliftIO.Exception (catch)
import UnliftIO.IORef
  ( newIORef,
    readIORef,
    writeIORef,
  )
import UnliftIO.MVar
  ( newEmptyMVar,
    putMVar,
    takeMVar,
    tryPutMVar,
    tryTakeMVar,
  )
import UnliftIO.STM (atomically)

untilJust :: (Monad m) => m (Maybe a) -> m a
untilJust :: forall (m :: * -> *) a. Monad m => m (Maybe a) -> m a
untilJust m (Maybe a)
act = m (Maybe a)
act m (Maybe a) -> (Maybe a -> m a) -> m a
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= m a -> (a -> m a) -> Maybe a -> m a
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (m (Maybe a) -> m a
forall (m :: * -> *) a. Monad m => m (Maybe a) -> m a
untilJust m (Maybe a)
act) a -> m a
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return

watchDirectory' ::
  forall m. (MonadIO m) => FilePath -> m (IO (), IO (FilePath, UTCTime))
watchDirectory' :: forall (m :: * -> *).
MonadIO m =>
FilePath -> m (IO (), IO (FilePath, UTCTime))
watchDirectory' FilePath
d = do
  MVar (FilePath, UTCTime)
mvar <- m (MVar (FilePath, UTCTime))
forall (m :: * -> *) a. MonadIO m => m (MVar a)
newEmptyMVar
  let handler :: Event -> IO ()
      handler :: Event -> IO ()
handler Event
e = case Event
e of
        Added FilePath
fp UTCTime
t EventIsDirectory
FSNotify.IsFile -> FilePath -> UTCTime -> IO ()
doIt FilePath
fp UTCTime
t
        Modified FilePath
fp UTCTime
t EventIsDirectory
FSNotify.IsFile -> FilePath -> UTCTime -> IO ()
doIt FilePath
fp UTCTime
t
        Event
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        where
          doIt :: FilePath -> UTCTime -> IO ()
doIt FilePath
fp UTCTime
t = do
            Maybe (FilePath, UTCTime)
_ <- MVar (FilePath, UTCTime) -> IO (Maybe (FilePath, UTCTime))
forall (m :: * -> *) a. MonadIO m => MVar a -> m (Maybe a)
tryTakeMVar MVar (FilePath, UTCTime)
mvar
            MVar (FilePath, UTCTime) -> (FilePath, UTCTime) -> IO ()
forall (m :: * -> *) a. MonadIO m => MVar a -> a -> m ()
putMVar MVar (FilePath, UTCTime)
mvar (FilePath
fp, UTCTime
t)
  -- janky: used to store the cancellation action returned
  -- by `watchDir`, which is created asynchronously
  MVar (IO ())
cleanupRef <- m (MVar (IO ()))
forall (m :: * -> *) a. MonadIO m => m (MVar a)
newEmptyMVar
  -- we don't like FSNotify's debouncing (it seems to drop later events)
  -- so we will be doing our own instead
  let config :: WatchConfig
config = WatchConfig
FSNotify.defaultConfig
  ThreadId
cancel <- IO ThreadId -> m ThreadId
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ThreadId -> m ThreadId) -> IO ThreadId -> m ThreadId
forall a b. (a -> b) -> a -> b
$
    IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$
      WatchConfig -> (WatchManager -> IO ()) -> IO ()
forall a. WatchConfig -> (WatchManager -> IO a) -> IO a
FSNotify.withManagerConf WatchConfig
config ((WatchManager -> IO ()) -> IO ())
-> (WatchManager -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \WatchManager
mgr -> do
        IO ()
cancelInner <- WatchManager
-> FilePath -> ActionPredicate -> (Event -> IO ()) -> IO (IO ())
FSNotify.watchDir WatchManager
mgr FilePath
d (Bool -> ActionPredicate
forall a b. a -> b -> a
const Bool
True) Event -> IO ()
handler IO (IO ()) -> IO (IO ()) -> IO (IO ())
forall a. IO a -> IO a -> IO a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> (IO () -> IO (IO ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()))
        MVar (IO ()) -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => MVar a -> a -> m ()
putMVar MVar (IO ())
cleanupRef (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ()
cancelInner
        IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
1000000
  let cleanup :: IO ()
      cleanup :: IO ()
cleanup = IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (MVar (IO ()) -> IO (IO ())
forall (m :: * -> *) a. MonadIO m => MVar a -> m a
takeMVar MVar (IO ())
cleanupRef) IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ThreadId -> IO ()
killThread ThreadId
cancel
  (IO (), IO (FilePath, UTCTime))
-> m (IO (), IO (FilePath, UTCTime))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (IO ()
cleanup, MVar (FilePath, UTCTime) -> IO (FilePath, UTCTime)
forall (m :: * -> *) a. MonadIO m => MVar a -> m a
takeMVar MVar (FilePath, UTCTime)
mvar)

collectUntilPause :: forall a. TQueue a -> Int -> IO [a]
collectUntilPause :: forall a. TQueue a -> Int -> IO [a]
collectUntilPause TQueue a
queue Int
minPauseµsec = do
  -- 1. wait for at least one element in the queue
  IO a -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO a -> IO ()) -> (STM a -> IO a) -> STM a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM a -> IO a
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM a -> IO ()) -> STM a -> IO ()
forall a b. (a -> b) -> a -> b
$ TQueue a -> STM a
forall a. TQueue a -> STM a
TQueue.peek TQueue a
queue

  let go :: IO [a]
      go :: IO [a]
go = do
        Word64
before <- STM Word64 -> IO Word64
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Word64 -> IO Word64) -> STM Word64 -> IO Word64
forall a b. (a -> b) -> a -> b
$ TQueue a -> STM Word64
forall a. TQueue a -> STM Word64
TQueue.enqueueCount TQueue a
queue
        Int -> IO ()
threadDelay Int
minPauseµsec
        Word64
after <- STM Word64 -> IO Word64
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Word64 -> IO Word64) -> STM Word64 -> IO Word64
forall a b. (a -> b) -> a -> b
$ TQueue a -> STM Word64
forall a. TQueue a -> STM Word64
TQueue.enqueueCount TQueue a
queue
        -- if nothing new is on the queue, then return the contents
        if Word64
before Word64 -> Word64 -> Bool
forall a. Eq a => a -> a -> Bool
== Word64
after
          then STM [a] -> IO [a]
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM [a] -> IO [a]) -> STM [a] -> IO [a]
forall a b. (a -> b) -> a -> b
$ TQueue a -> STM [a]
forall a. TQueue a -> STM [a]
TQueue.flush TQueue a
queue
          else IO [a]
go
  IO [a]
go

watchDirectory ::
  forall m.
  (MonadIO m) =>
  FilePath ->
  (FilePath -> Bool) ->
  m (IO (), IO (FilePath, Text))
watchDirectory :: forall (m :: * -> *).
MonadIO m =>
FilePath -> (FilePath -> Bool) -> m (IO (), IO (FilePath, Text))
watchDirectory FilePath
dir FilePath -> Bool
allow = do
  IORef (Map FilePath (Text, UTCTime))
previousFiles <- Map FilePath (Text, UTCTime)
-> m (IORef (Map FilePath (Text, UTCTime)))
forall (m :: * -> *) a. MonadIO m => a -> m (IORef a)
newIORef Map FilePath (Text, UTCTime)
forall k a. Map k a
Map.empty
  (IO ()
cancelWatch, IO (FilePath, UTCTime)
watcher) <- FilePath -> m (IO (), IO (FilePath, UTCTime))
forall (m :: * -> *).
MonadIO m =>
FilePath -> m (IO (), IO (FilePath, UTCTime))
watchDirectory' FilePath
dir
  let process :: FilePath -> UTCTime -> IO (Maybe (FilePath, Text))
      process :: FilePath -> UTCTime -> IO (Maybe (FilePath, Text))
process FilePath
file UTCTime
t =
        if FilePath -> Bool
allow FilePath
file
          then
            let handle :: IOException -> IO ()
                handle :: IOException -> IO ()
handle IOException
_e =
                  -- Sometimes we notice a change and try to read a file while it's being written.
                  -- This typically occurs when UCM is writing to the scratch file and can be
                  -- ignored anyways.
                  () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                go :: IO (Maybe (FilePath, Text))
                go :: IO (Maybe (FilePath, Text))
go = IO (Maybe (FilePath, Text)) -> IO (Maybe (FilePath, Text))
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (FilePath, Text)) -> IO (Maybe (FilePath, Text)))
-> IO (Maybe (FilePath, Text)) -> IO (Maybe (FilePath, Text))
forall a b. (a -> b) -> a -> b
$ do
                  Text
contents <- FilePath -> IO Text
readUtf8 FilePath
file
                  Map FilePath (Text, UTCTime)
prevs <- IORef (Map FilePath (Text, UTCTime))
-> IO (Map FilePath (Text, UTCTime))
forall (m :: * -> *) a. MonadIO m => IORef a -> m a
readIORef IORef (Map FilePath (Text, UTCTime))
previousFiles
                  case FilePath -> Map FilePath (Text, UTCTime) -> Maybe (Text, UTCTime)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup FilePath
file Map FilePath (Text, UTCTime)
prevs of
                    -- if the file's content's haven't changed and less than .5s
                    -- have elapsed, wait for the next update
                    Just (Text
contents0, UTCTime
t0)
                      | Text
contents Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
== Text
contents0 Bool -> Bool -> Bool
&& (UTCTime
t UTCTime -> UTCTime -> NominalDiffTime
`diffUTCTime` UTCTime
t0) NominalDiffTime -> NominalDiffTime -> Bool
forall a. Ord a => a -> a -> Bool
< NominalDiffTime
0.5 ->
                          Maybe (FilePath, Text) -> IO (Maybe (FilePath, Text))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (FilePath, Text)
forall a. Maybe a
Nothing
                    Maybe (Text, UTCTime)
_ ->
                      (FilePath, Text) -> Maybe (FilePath, Text)
forall a. a -> Maybe a
Just (FilePath
file, Text
contents)
                        Maybe (FilePath, Text) -> IO () -> IO (Maybe (FilePath, Text))
forall a b. a -> IO b -> IO a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ IORef (Map FilePath (Text, UTCTime))
-> Map FilePath (Text, UTCTime) -> IO ()
forall (m :: * -> *) a. MonadIO m => IORef a -> a -> m ()
writeIORef IORef (Map FilePath (Text, UTCTime))
previousFiles (FilePath
-> (Text, UTCTime)
-> Map FilePath (Text, UTCTime)
-> Map FilePath (Text, UTCTime)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert FilePath
file (Text
contents, UTCTime
t) Map FilePath (Text, UTCTime)
prevs)
             in IO (Maybe (FilePath, Text))
-> (IOException -> IO (Maybe (FilePath, Text)))
-> IO (Maybe (FilePath, Text))
forall (m :: * -> *) e a.
(MonadUnliftIO m, Exception e) =>
m a -> (e -> m a) -> m a
catch IO (Maybe (FilePath, Text))
go (\IOException
e -> Maybe (FilePath, Text)
forall a. Maybe a
Nothing Maybe (FilePath, Text) -> IO () -> IO (Maybe (FilePath, Text))
forall a b. a -> IO b -> IO a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ IOException -> IO ()
handle IOException
e)
          else Maybe (FilePath, Text) -> IO (Maybe (FilePath, Text))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (FilePath, Text)
forall a. Maybe a
Nothing
  TQueue (FilePath, UTCTime)
queue <- m (TQueue (FilePath, UTCTime))
forall a (m :: * -> *). MonadIO m => m (TQueue a)
TQueue.newIO
  MVar ()
gate <- IO (MVar ()) -> m (MVar ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar ())
forall (m :: * -> *) a. MonadIO m => m (MVar a)
newEmptyMVar
  -- We spawn a separate thread to siphon the file change events
  -- into a queue, which can be debounced using `collectUntilPause`
  ThreadId
enqueuer <- IO ThreadId -> m ThreadId
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ThreadId -> m ThreadId)
-> (IO () -> IO ThreadId) -> IO () -> m ThreadId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ThreadId
forkIO (IO () -> m ThreadId) -> IO () -> m ThreadId
forall a b. (a -> b) -> a -> b
$ do
    MVar () -> IO ()
forall (m :: * -> *) a. MonadIO m => MVar a -> m a
takeMVar MVar ()
gate -- wait until gate open before starting
    IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
      event :: (FilePath, UTCTime)
event@(FilePath
file, UTCTime
_) <- IO (FilePath, UTCTime)
watcher
      Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (FilePath -> Bool
allow FilePath
file) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
        STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
          TQueue (FilePath, UTCTime) -> (FilePath, UTCTime) -> STM ()
forall a. TQueue a -> a -> STM ()
TQueue.enqueue TQueue (FilePath, UTCTime)
queue (FilePath, UTCTime)
event
  IORef [(FilePath, UTCTime)]
pending <- [(FilePath, UTCTime)] -> m (IORef [(FilePath, UTCTime)])
forall (m :: * -> *) a. MonadIO m => a -> m (IORef a)
newIORef []
  let await :: IO (FilePath, Text)
      await :: IO (FilePath, Text)
await =
        IO (Maybe (FilePath, Text)) -> IO (FilePath, Text)
forall (m :: * -> *) a. Monad m => m (Maybe a) -> m a
untilJust (IO (Maybe (FilePath, Text)) -> IO (FilePath, Text))
-> IO (Maybe (FilePath, Text)) -> IO (FilePath, Text)
forall a b. (a -> b) -> a -> b
$
          IORef [(FilePath, UTCTime)] -> IO [(FilePath, UTCTime)]
forall (m :: * -> *) a. MonadIO m => IORef a -> m a
readIORef IORef [(FilePath, UTCTime)]
pending IO [(FilePath, UTCTime)]
-> ([(FilePath, UTCTime)] -> IO (Maybe (FilePath, Text)))
-> IO (Maybe (FilePath, Text))
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
            [] -> do
              -- open the gate
              MVar () -> () -> IO Bool
forall (m :: * -> *) a. MonadIO m => MVar a -> a -> m Bool
tryPutMVar MVar ()
gate ()
              -- this debounces the events, waits for 50ms pause
              -- in file change events
              [(FilePath, UTCTime)]
events <- TQueue (FilePath, UTCTime) -> Int -> IO [(FilePath, UTCTime)]
forall a. TQueue a -> Int -> IO [a]
collectUntilPause TQueue (FilePath, UTCTime)
queue Int
50000
              -- traceM $ "Collected file change events" <> show events
              case [(FilePath, UTCTime)]
events of
                [] -> Maybe (FilePath, Text) -> IO (Maybe (FilePath, Text))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (FilePath, Text)
forall a. Maybe a
Nothing
                -- we pick the last of the events within the 50ms window
                -- TODO: consider enqueing other events if there are
                -- multiple events for different files
                [(FilePath, UTCTime)]
_ -> (FilePath -> UTCTime -> IO (Maybe (FilePath, Text)))
-> (FilePath, UTCTime) -> IO (Maybe (FilePath, Text))
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry FilePath -> UTCTime -> IO (Maybe (FilePath, Text))
process ((FilePath, UTCTime) -> IO (Maybe (FilePath, Text)))
-> (FilePath, UTCTime) -> IO (Maybe (FilePath, Text))
forall a b. (a -> b) -> a -> b
$ [(FilePath, UTCTime)] -> (FilePath, UTCTime)
forall a. HasCallStack => [a] -> a
last [(FilePath, UTCTime)]
events
            ((FilePath
file, UTCTime
t) : [(FilePath, UTCTime)]
rest) -> do
              IORef [(FilePath, UTCTime)] -> [(FilePath, UTCTime)] -> IO ()
forall (m :: * -> *) a. MonadIO m => IORef a -> a -> m ()
writeIORef IORef [(FilePath, UTCTime)]
pending [(FilePath, UTCTime)]
rest
              FilePath -> UTCTime -> IO (Maybe (FilePath, Text))
process FilePath
file UTCTime
t
      cancel :: IO ()
cancel = IO ()
cancelWatch IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ThreadId -> IO ()
killThread ThreadId
enqueuer
  (IO (), IO (FilePath, Text)) -> m (IO (), IO (FilePath, Text))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (IO ()
cancel, IO (FilePath, Text)
await)