{-# LANGUAGE DataKinds #-}
{-# LANGUAGE TypeOperators #-}

module Unison.Share.Sync
  ( -- ** Get causal hash by path
    getCausalHashByPath,
    GetCausalHashByPathError (..),

    -- ** Upload
    uploadEntities,

    -- ** Pull/Download
    pull,
    PullError (..),
    downloadEntities,
  )
where

import Control.Concurrent.STM
import Control.Lens
import Control.Monad.Except
import Control.Monad.Reader (ask)
import Control.Monad.Trans.Reader (ReaderT, runReaderT)
import Control.Monad.Trans.Reader qualified as Reader
import Data.Map qualified as Map
import Data.Map.NonEmpty (NEMap)
import Data.Map.NonEmpty qualified as NEMap
import Data.Proxy
import Data.Set qualified as Set
import Data.Set.NonEmpty (NESet)
import Data.Set.NonEmpty qualified as NESet
import Data.Text.Lazy qualified as Text.Lazy
import Data.Text.Lazy.Encoding qualified as Text.Lazy
import GHC.IO (unsafePerformIO)
import Ki qualified
import Network.HTTP.Client qualified as Http.Client
import Network.HTTP.Types qualified as HTTP
import Servant.API qualified as Servant ((:<|>) (..), (:>))
import Servant.Client (BaseUrl)
import Servant.Client qualified as Servant
import System.Environment (lookupEnv)
import U.Codebase.HashTags (CausalHash)
import U.Codebase.Sqlite.HashHandle qualified as HH
import U.Codebase.Sqlite.Queries qualified as Q
import U.Codebase.Sqlite.V2.HashHandle (v2HashHandle)
import Unison.Auth.HTTPClient (AuthenticatedHttpClient)
import Unison.Auth.HTTPClient qualified as Auth
import Unison.Cli.Monad (Cli)
import Unison.Cli.Monad qualified as Cli
import Unison.Codebase qualified as Codebase
import Unison.Debug qualified as Debug
import Unison.Hash32 (Hash32)
import Unison.Prelude
import Unison.Share.API.Hash qualified as Share
import Unison.Share.ExpectedHashMismatches (expectedCausalHashMismatches, expectedComponentHashMismatches)
import Unison.Share.Sync.Types
import Unison.Sqlite qualified as Sqlite
import Unison.Sync.API qualified as Share (API)
import Unison.Sync.Common (entityToTempEntity, expectEntity, hash32ToCausalHash)
import Unison.Sync.EntityValidation qualified as EV
import Unison.Sync.Types qualified as Share
import Unison.Util.Monoid (foldMapM)

------------------------------------------------------------------------------------------------------------------------
-- Pile of constants

-- | The maximum number of downloader threads, during a pull.
maxSimultaneousPullDownloaders :: Int
maxSimultaneousPullDownloaders :: Int
maxSimultaneousPullDownloaders = IO Int -> Int
forall a. IO a -> a
unsafePerformIO (IO Int -> Int) -> IO Int -> Int
forall a b. (a -> b) -> a -> b
$ do
  String -> IO (Maybe String)
lookupEnv String
"UNISON_PULL_WORKERS" IO (Maybe String) -> (Maybe String -> Int) -> IO Int
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
    Just String
n -> String -> Int
forall a. Read a => String -> a
read String
n
    Maybe String
Nothing -> Int
5
{-# NOINLINE maxSimultaneousPullDownloaders #-}

-- | The maximum number of push workers at a time. Each push worker reads from the database and uploads entities.
-- Share currently parallelizes on it's own in the backend, and any more than one push worker
-- just results in serialization conflicts which slow things down.
maxSimultaneousPushWorkers :: Int
maxSimultaneousPushWorkers :: Int
maxSimultaneousPushWorkers = IO Int -> Int
forall a. IO a -> a
unsafePerformIO (IO Int -> Int) -> IO Int -> Int
forall a b. (a -> b) -> a -> b
$ do
  String -> IO (Maybe String)
lookupEnv String
"UNISON_PUSH_WORKERS" IO (Maybe String) -> (Maybe String -> Int) -> IO Int
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
    Just String
n -> String -> Int
forall a. Read a => String -> a
read String
n
    Maybe String
Nothing -> Int
1
{-# NOINLINE maxSimultaneousPushWorkers #-}

syncChunkSize :: Int
syncChunkSize :: Int
syncChunkSize = IO Int -> Int
forall a. IO a -> a
unsafePerformIO (IO Int -> Int) -> IO Int -> Int
forall a b. (a -> b) -> a -> b
$ do
  String -> IO (Maybe String)
lookupEnv String
"UNISON_SYNC_CHUNK_SIZE" IO (Maybe String) -> (Maybe String -> Int) -> IO Int
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
    Just String
n -> String -> Int
forall a. Read a => String -> a
read String
n
    Maybe String
Nothing -> Int
50
{-# NOINLINE syncChunkSize #-}

------------------------------------------------------------------------------------------------------------------------
-- Pull

pull ::
  -- | The Unison Share URL.
  BaseUrl ->
  -- | The repo+path to pull from.
  Share.Path ->
  -- | Callback that's given a number of entities we just downloaded.
  (Int -> IO ()) ->
  Cli (Either (SyncError PullError) CausalHash)
pull :: BaseUrl
-> Path
-> (Int -> IO ())
-> Cli (Either (SyncError PullError) CausalHash)
pull BaseUrl
unisonShareUrl Path
repoPath Int -> IO ()
downloadedCallback =
  BaseUrl
-> Path
-> Cli
     (Either (SyncError GetCausalHashByPathError) (Maybe HashJWT))
getCausalHashByPath BaseUrl
unisonShareUrl Path
repoPath Cli (Either (SyncError GetCausalHashByPathError) (Maybe HashJWT))
-> (Either (SyncError GetCausalHashByPathError) (Maybe HashJWT)
    -> Cli (Either (SyncError PullError) CausalHash))
-> Cli (Either (SyncError PullError) CausalHash)
forall a b. Cli a -> (a -> Cli b) -> Cli b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Left SyncError GetCausalHashByPathError
err -> Either (SyncError PullError) CausalHash
-> Cli (Either (SyncError PullError) CausalHash)
forall a. a -> Cli a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SyncError PullError -> Either (SyncError PullError) CausalHash
forall a b. a -> Either a b
Left (GetCausalHashByPathError -> PullError
PullError'GetCausalHash (GetCausalHashByPathError -> PullError)
-> SyncError GetCausalHashByPathError -> SyncError PullError
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SyncError GetCausalHashByPathError
err))
    -- There's nothing at the remote path, so there's no causal to pull.
    Right Maybe HashJWT
Nothing -> Either (SyncError PullError) CausalHash
-> Cli (Either (SyncError PullError) CausalHash)
forall a. a -> Cli a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SyncError PullError -> Either (SyncError PullError) CausalHash
forall a b. a -> Either a b
Left (PullError -> SyncError PullError
forall e. e -> SyncError e
SyncError (Path -> PullError
PullError'NoHistoryAtPath Path
repoPath)))
    Right (Just HashJWT
hashJwt) ->
      BaseUrl
-> RepoInfo
-> HashJWT
-> (Int -> IO ())
-> Cli (Either (SyncError DownloadEntitiesError) ())
downloadEntities BaseUrl
unisonShareUrl (Path -> RepoInfo
Share.pathRepoInfo Path
repoPath) HashJWT
hashJwt Int -> IO ()
downloadedCallback Cli (Either (SyncError DownloadEntitiesError) ())
-> (Either (SyncError DownloadEntitiesError) ()
    -> Either (SyncError PullError) CausalHash)
-> Cli (Either (SyncError PullError) CausalHash)
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
        Left SyncError DownloadEntitiesError
err -> SyncError PullError -> Either (SyncError PullError) CausalHash
forall a b. a -> Either a b
Left (DownloadEntitiesError -> PullError
PullError'DownloadEntities (DownloadEntitiesError -> PullError)
-> SyncError DownloadEntitiesError -> SyncError PullError
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SyncError DownloadEntitiesError
err)
        Right () -> CausalHash -> Either (SyncError PullError) CausalHash
forall a b. b -> Either a b
Right (Hash32 -> CausalHash
hash32ToCausalHash (HashJWT -> Hash32
Share.hashJWTHash HashJWT
hashJwt))

------------------------------------------------------------------------------------------------------------------------
-- Download entities

downloadEntities ::
  -- | The Unison Share URL.
  BaseUrl ->
  -- | The repo to download from.
  Share.RepoInfo ->
  -- | The hash to download.
  Share.HashJWT ->
  -- | Callback that's given a number of entities we just downloaded.
  (Int -> IO ()) ->
  Cli (Either (SyncError Share.DownloadEntitiesError) ())
downloadEntities :: BaseUrl
-> RepoInfo
-> HashJWT
-> (Int -> IO ())
-> Cli (Either (SyncError DownloadEntitiesError) ())
downloadEntities BaseUrl
unisonShareUrl RepoInfo
repoInfo HashJWT
hashJwt Int -> IO ()
downloadedCallback = do
  Cli.Env {authHTTPClient, codebase} <- Cli Env
forall r (m :: * -> *). MonadReader r m => m r
ask

  Cli.label \forall void.
Either (SyncError DownloadEntitiesError) () -> Cli void
done -> do
    let failed :: SyncError Share.DownloadEntitiesError -> Cli void
        failed :: forall void. SyncError DownloadEntitiesError -> Cli void
failed = Either (SyncError DownloadEntitiesError) () -> Cli void
forall void.
Either (SyncError DownloadEntitiesError) () -> Cli void
done (Either (SyncError DownloadEntitiesError) () -> Cli void)
-> (SyncError DownloadEntitiesError
    -> Either (SyncError DownloadEntitiesError) ())
-> SyncError DownloadEntitiesError
-> Cli void
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError DownloadEntitiesError
-> Either (SyncError DownloadEntitiesError) ()
forall a b. a -> Either a b
Left

    let hash :: Hash32
hash = HashJWT -> Hash32
Share.hashJWTHash HashJWT
hashJwt

    maybeTempEntities <-
      Transaction (Maybe EntityLocation) -> Cli (Maybe EntityLocation)
forall a. Transaction a -> Cli a
Cli.runTransaction (Hash32 -> Transaction (Maybe EntityLocation)
Q.entityLocation Hash32
hash) Cli (Maybe EntityLocation)
-> (Maybe EntityLocation -> Cli (Maybe (NESet Hash32)))
-> Cli (Maybe (NESet Hash32))
forall a b. Cli a -> (a -> Cli b) -> Cli b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Just EntityLocation
Q.EntityInMainStorage -> Maybe (NESet Hash32) -> Cli (Maybe (NESet Hash32))
forall a. a -> Cli a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (NESet Hash32)
forall a. Maybe a
Nothing
        Just EntityLocation
Q.EntityInTempStorage -> Maybe (NESet Hash32) -> Cli (Maybe (NESet Hash32))
forall a. a -> Cli a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (NESet Hash32 -> Maybe (NESet Hash32)
forall a. a -> Maybe a
Just (Hash32 -> NESet Hash32
forall a. a -> NESet a
NESet.singleton Hash32
hash))
        Maybe EntityLocation
Nothing -> do
          let request :: IO (Either CodeserverTransportError DownloadEntitiesResponse)
request =
                AuthenticatedHttpClient
-> BaseUrl
-> DownloadEntitiesRequest
-> IO (Either CodeserverTransportError DownloadEntitiesResponse)
httpDownloadEntities
                  AuthenticatedHttpClient
authHTTPClient
                  BaseUrl
unisonShareUrl
                  Share.DownloadEntitiesRequest {RepoInfo
repoInfo :: RepoInfo
repoInfo :: RepoInfo
repoInfo, hashes :: NESet HashJWT
hashes = HashJWT -> NESet HashJWT
forall a. a -> NESet a
NESet.singleton HashJWT
hashJwt}
          entities <-
            IO (Either CodeserverTransportError DownloadEntitiesResponse)
-> Cli (Either CodeserverTransportError DownloadEntitiesResponse)
forall a. IO a -> Cli a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Either CodeserverTransportError DownloadEntitiesResponse)
request Cli (Either CodeserverTransportError DownloadEntitiesResponse)
-> (Either CodeserverTransportError DownloadEntitiesResponse
    -> Cli (NEMap Hash32 (Entity Text Hash32 HashJWT)))
-> Cli (NEMap Hash32 (Entity Text Hash32 HashJWT))
forall a b. Cli a -> (a -> Cli b) -> Cli b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
              Left CodeserverTransportError
err -> SyncError DownloadEntitiesError
-> Cli (NEMap Hash32 (Entity Text Hash32 HashJWT))
forall void. SyncError DownloadEntitiesError -> Cli void
failed (CodeserverTransportError -> SyncError DownloadEntitiesError
forall e. CodeserverTransportError -> SyncError e
TransportError CodeserverTransportError
err)
              Right (Share.DownloadEntitiesFailure DownloadEntitiesError
err) -> SyncError DownloadEntitiesError
-> Cli (NEMap Hash32 (Entity Text Hash32 HashJWT))
forall void. SyncError DownloadEntitiesError -> Cli void
failed (DownloadEntitiesError -> SyncError DownloadEntitiesError
forall e. e -> SyncError e
SyncError DownloadEntitiesError
err)
              Right (Share.DownloadEntitiesSuccess NEMap Hash32 (Entity Text Hash32 HashJWT)
entities) -> NEMap Hash32 (Entity Text Hash32 HashJWT)
-> Cli (NEMap Hash32 (Entity Text Hash32 HashJWT))
forall a. a -> Cli a
forall (f :: * -> *) a. Applicative f => a -> f a
pure NEMap Hash32 (Entity Text Hash32 HashJWT)
entities
          case validateEntities entities of
            Left EntityValidationError
err -> SyncError DownloadEntitiesError -> Cli ()
forall void. SyncError DownloadEntitiesError -> Cli void
failed (SyncError DownloadEntitiesError -> Cli ())
-> (EntityValidationError -> SyncError DownloadEntitiesError)
-> EntityValidationError
-> Cli ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DownloadEntitiesError -> SyncError DownloadEntitiesError
forall e. e -> SyncError e
SyncError (DownloadEntitiesError -> SyncError DownloadEntitiesError)
-> (EntityValidationError -> DownloadEntitiesError)
-> EntityValidationError
-> SyncError DownloadEntitiesError
forall b c a. (b -> c) -> (a -> b) -> a -> c
. EntityValidationError -> DownloadEntitiesError
Share.DownloadEntitiesEntityValidationFailure (EntityValidationError -> Cli ())
-> EntityValidationError -> Cli ()
forall a b. (a -> b) -> a -> b
$ EntityValidationError
err
            Right () -> () -> Cli ()
forall a. a -> Cli a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
          tempEntities <- Cli.runTransaction (insertEntities entities)
          liftIO (downloadedCallback 1)
          pure (NESet.nonEmptySet tempEntities)

    whenJust maybeTempEntities \NESet Hash32
tempEntities -> do
      let doCompleteTempEntities :: IO (Either (SyncError DownloadEntitiesError) ())
doCompleteTempEntities =
            AuthenticatedHttpClient
-> BaseUrl
-> (forall a. ((forall x. Transaction x -> IO x) -> IO a) -> IO a)
-> RepoInfo
-> (Int -> IO ())
-> NESet Hash32
-> IO (Either (SyncError DownloadEntitiesError) ())
completeTempEntities
              AuthenticatedHttpClient
authHTTPClient
              BaseUrl
unisonShareUrl
              ( \(forall x. Transaction x -> IO x) -> IO a
action ->
                  Codebase IO Symbol Ann -> forall x. (Connection -> IO x) -> IO x
forall (m :: * -> *) v a.
Codebase m v a -> forall x. (Connection -> m x) -> m x
Codebase.withConnection Codebase IO Symbol Ann
codebase \Connection
conn ->
                    (forall x. Transaction x -> IO x) -> IO a
action (Connection -> Transaction x -> IO x
forall (m :: * -> *) a.
(MonadIO m, HasCallStack) =>
Connection -> Transaction a -> m a
Sqlite.runTransaction Connection
conn)
              )
              RepoInfo
repoInfo
              Int -> IO ()
downloadedCallback
              NESet Hash32
tempEntities
      IO (Either (SyncError DownloadEntitiesError) ())
-> Cli (Either (SyncError DownloadEntitiesError) ())
forall a. IO a -> Cli a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Either (SyncError DownloadEntitiesError) ())
doCompleteTempEntities Cli (Either (SyncError DownloadEntitiesError) ())
-> (Cli (Either (SyncError DownloadEntitiesError) ()) -> Cli ())
-> Cli ()
forall a b. a -> (a -> b) -> b
& (SyncError DownloadEntitiesError -> Cli ())
-> Cli (Either (SyncError DownloadEntitiesError) ()) -> Cli ()
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> m (Either a b) -> m b
onLeftM \SyncError DownloadEntitiesError
err ->
        SyncError DownloadEntitiesError -> Cli ()
forall void. SyncError DownloadEntitiesError -> Cli void
failed SyncError DownloadEntitiesError
err
    -- Since we may have just inserted and then deleted many temp entities, we attempt to recover some disk space by
    -- vacuuming after each pull. If the vacuum fails due to another open transaction on this connection, that's ok,
    -- we'll try vacuuming again next pull.
    _success <- liftIO (Codebase.withConnection codebase Sqlite.vacuum)
    pure (Right ())

-- | Validates the provided entities if and only if the environment variable `UNISON_ENTITY_VALIDATION` is set to "true".
validateEntities :: NEMap Hash32 (Share.Entity Text Hash32 Share.HashJWT) -> Either Share.EntityValidationError ()
validateEntities :: NEMap Hash32 (Entity Text Hash32 HashJWT)
-> Either EntityValidationError ()
validateEntities NEMap Hash32 (Entity Text Hash32 HashJWT)
entities =
  Bool
-> Either EntityValidationError ()
-> Either EntityValidationError ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
shouldValidateEntities (Either EntityValidationError ()
 -> Either EntityValidationError ())
-> Either EntityValidationError ()
-> Either EntityValidationError ()
forall a b. (a -> b) -> a -> b
$ do
    Map Hash32 (Entity Text Hash32 HashJWT)
-> (Hash32
    -> Entity Text Hash32 HashJWT -> Either EntityValidationError ())
-> Either EntityValidationError ()
forall i (t :: * -> *) (f :: * -> *) a b.
(FoldableWithIndex i t, Applicative f) =>
t a -> (i -> a -> f b) -> f ()
ifor_ (NEMap Hash32 (Entity Text Hash32 HashJWT)
-> Map Hash32 (Entity Text Hash32 HashJWT)
forall k a. NEMap k a -> Map k a
NEMap.toMap NEMap Hash32 (Entity Text Hash32 HashJWT)
entities) \Hash32
hash Entity Text Hash32 HashJWT
entity -> do
      let entityWithHashes :: Entity Text Hash32 Hash32
entityWithHashes = Entity Text Hash32 HashJWT
entity Entity Text Hash32 HashJWT
-> (Entity Text Hash32 HashJWT -> Entity Text Hash32 Hash32)
-> Entity Text Hash32 Hash32
forall a b. a -> (a -> b) -> b
& (HashJWT -> Identity Hash32)
-> Entity Text Hash32 HashJWT
-> Identity (Entity Text Hash32 Hash32)
forall (m :: * -> *) hash' hash text noSyncHash.
(Applicative m, Ord hash') =>
(hash -> m hash')
-> Entity text noSyncHash hash -> m (Entity text noSyncHash hash')
Share.entityHashes_ ((HashJWT -> Identity Hash32)
 -> Entity Text Hash32 HashJWT
 -> Identity (Entity Text Hash32 Hash32))
-> (HashJWT -> Hash32)
-> Entity Text Hash32 HashJWT
-> Entity Text Hash32 Hash32
forall s t a b. ASetter s t a b -> (a -> b) -> s -> t
%~ HashJWT -> Hash32
Share.hashJWTHash
      case Hash32
-> Entity Text Hash32 Hash32
-> Maybe (Either HashingFailure EntityValidationError)
EV.validateEntity Hash32
hash Entity Text Hash32 Hash32
entityWithHashes of
        Maybe (Either HashingFailure EntityValidationError)
Nothing -> () -> Either EntityValidationError ()
forall a. a -> Either EntityValidationError a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        Just (Left err :: HashingFailure
err@(HH.IncompleteElementOrderingError ComponentHash
_componentHash)) ->
          Either HashingFailure (Either EntityValidationError ())
-> Either EntityValidationError ()
forall a. HasCallStack => Either HashingFailure a -> a
HH.crashOnHashingFailure (HashingFailure
-> Either HashingFailure (Either EntityValidationError ())
forall a b. a -> Either a b
Left HashingFailure
err)
        Just (Right err :: EntityValidationError
err@(Share.EntityHashMismatch EntityType
et (Share.HashMismatchForEntity {Hash32
supplied :: Hash32
supplied :: HashMismatchForEntity -> Hash32
supplied, Hash32
computed :: Hash32
computed :: HashMismatchForEntity -> Hash32
computed}))) ->
          let expectedMismatches :: Map Hash32 Hash32
expectedMismatches = case EntityType
et of
                EntityType
Share.TermComponentType -> Map Hash32 Hash32
expectedComponentHashMismatches
                EntityType
Share.DeclComponentType -> Map Hash32 Hash32
expectedComponentHashMismatches
                EntityType
Share.CausalType -> Map Hash32 Hash32
expectedCausalHashMismatches
                EntityType
_ -> Map Hash32 Hash32
forall a. Monoid a => a
mempty
           in case Hash32 -> Map Hash32 Hash32 -> Maybe Hash32
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Hash32
supplied Map Hash32 Hash32
expectedMismatches of
                Just Hash32
expected
                  | Hash32
expected Hash32 -> Hash32 -> Bool
forall a. Eq a => a -> a -> Bool
== Hash32
computed -> () -> Either EntityValidationError ()
forall a. a -> Either EntityValidationError a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                Maybe Hash32
_ -> do
                  EntityValidationError -> Either EntityValidationError ()
forall a b. a -> Either a b
Left EntityValidationError
err
        Just (Right EntityValidationError
err) -> do
          EntityValidationError -> Either EntityValidationError ()
forall a b. a -> Either a b
Left EntityValidationError
err

-- | Validate entities received from the server unless this flag is set to false.
validationEnvKey :: String
validationEnvKey :: String
validationEnvKey = String
"UNISON_ENTITY_VALIDATION"

shouldValidateEntities :: Bool
shouldValidateEntities :: Bool
shouldValidateEntities = IO Bool -> Bool
forall a. IO a -> a
unsafePerformIO (IO Bool -> Bool) -> IO Bool -> Bool
forall a b. (a -> b) -> a -> b
$ do
  String -> IO (Maybe String)
lookupEnv String
validationEnvKey IO (Maybe String) -> (Maybe String -> Bool) -> IO Bool
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
    Just String
"false" -> Bool
False
    Maybe String
_ -> Bool
True
{-# NOINLINE shouldValidateEntities #-}

type WorkerCount =
  TVar Int

newWorkerCount :: IO WorkerCount
newWorkerCount :: IO WorkerCount
newWorkerCount =
  Int -> IO WorkerCount
forall a. a -> IO (TVar a)
newTVarIO Int
0

recordWorking :: WorkerCount -> STM ()
recordWorking :: WorkerCount -> STM ()
recordWorking WorkerCount
sem =
  WorkerCount -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' WorkerCount
sem (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)

recordNotWorking :: WorkerCount -> STM ()
recordNotWorking :: WorkerCount -> STM ()
recordNotWorking WorkerCount
sem =
  WorkerCount -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' WorkerCount
sem \Int
n -> Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1

-- What the dispatcher is to do
data DispatcherJob
  = DispatcherForkWorker (NESet Share.HashJWT)
  | DispatcherReturnEarlyBecauseDownloaderFailed (SyncError Share.DownloadEntitiesError)
  | DispatcherDone

-- | Finish downloading entities from Unison Share (or return the first failure to download something).
--
-- Precondition: the entities were *already* downloaded at some point in the past, and are now sitting in the
-- `temp_entity` table, waiting for their dependencies to arrive so they can be flushed to main storage.
completeTempEntities ::
  AuthenticatedHttpClient ->
  BaseUrl ->
  (forall a. ((forall x. Sqlite.Transaction x -> IO x) -> IO a) -> IO a) ->
  Share.RepoInfo ->
  (Int -> IO ()) ->
  NESet Hash32 ->
  IO (Either (SyncError Share.DownloadEntitiesError) ())
completeTempEntities :: AuthenticatedHttpClient
-> BaseUrl
-> (forall a. ((forall x. Transaction x -> IO x) -> IO a) -> IO a)
-> RepoInfo
-> (Int -> IO ())
-> NESet Hash32
-> IO (Either (SyncError DownloadEntitiesError) ())
completeTempEntities AuthenticatedHttpClient
httpClient BaseUrl
unisonShareUrl forall a. ((forall x. Transaction x -> IO x) -> IO a) -> IO a
connect RepoInfo
repoInfo Int -> IO ()
downloadedCallback NESet Hash32
initialNewTempEntities = do
  -- The set of hashes we still need to download
  hashesVar <- Set HashJWT -> IO (TVar (Set HashJWT))
forall a. a -> IO (TVar a)
newTVarIO Set HashJWT
forall a. Set a
Set.empty

  -- The set of hashes that we haven't inserted yet, but will soon, because we've committed to downloading them.
  uninsertedHashesVar <- newTVarIO Set.empty

  -- The entities payloads (along with the jwts that we used to download them) that we've downloaded
  entitiesQueue <- newTQueueIO

  -- The sets of new (at the time of inserting, anyway) temp entity rows, which we need to elaborate, then download.
  newTempEntitiesQueue <- newTQueueIO

  -- How many workers (downloader / inserter / elaborator) are currently doing stuff.
  workerCount <- newWorkerCount

  -- The first download error seen by a downloader, if any.
  downloaderFailedVar <- newEmptyTMVarIO

  -- Kick off the cycle of inserter->elaborator->dispatcher->downloader by giving the elaborator something to do
  atomically (writeTQueue newTempEntitiesQueue (Set.empty, Just initialNewTempEntities))

  Ki.scoped \Scope
scope -> do
    Scope -> IO Void -> IO ()
Ki.fork_ Scope
scope (TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
-> TQueue (Set HashJWT, Maybe (NESet Hash32))
-> WorkerCount
-> IO Void
inserter TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
entitiesQueue TQueue (Set HashJWT, Maybe (NESet Hash32))
newTempEntitiesQueue WorkerCount
workerCount)
    Scope -> IO Void -> IO ()
Ki.fork_ Scope
scope (TVar (Set HashJWT)
-> TVar (Set HashJWT)
-> TQueue (Set HashJWT, Maybe (NESet Hash32))
-> WorkerCount
-> IO Void
elaborator TVar (Set HashJWT)
hashesVar TVar (Set HashJWT)
uninsertedHashesVar TQueue (Set HashJWT, Maybe (NESet Hash32))
newTempEntitiesQueue WorkerCount
workerCount)
    TVar (Set HashJWT)
-> TVar (Set HashJWT)
-> TQueue
     (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
-> TQueue (Set HashJWT, Maybe (NESet Hash32))
-> WorkerCount
-> TMVar (SyncError DownloadEntitiesError)
-> IO (Either (SyncError DownloadEntitiesError) ())
dispatcher TVar (Set HashJWT)
hashesVar TVar (Set HashJWT)
uninsertedHashesVar TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
entitiesQueue TQueue (Set HashJWT, Maybe (NESet Hash32))
newTempEntitiesQueue WorkerCount
workerCount TMVar (SyncError DownloadEntitiesError)
downloaderFailedVar
  where
    -- Dispatcher thread: "dequeue" from `hashesVar`, fork one-shot downloaders.
    --
    -- We stop when either all of the following are true:
    --
    --   - There are no outstanding workers (downloaders, inserter, elaboraror)
    --   - The inserter thread doesn't have any outstanding work enqueued (in `entitiesQueue`)
    --   - The elaborator thread doesn't have any outstanding work enqueued (in `newTempEntitiesQueue`)
    --
    -- Or:
    --
    --   - Some downloader failed to download something
    dispatcher ::
      TVar (Set Share.HashJWT) ->
      TVar (Set Share.HashJWT) ->
      TQueue (NESet Share.HashJWT, NEMap Hash32 (Share.Entity Text Hash32 Share.HashJWT)) ->
      TQueue (Set Share.HashJWT, Maybe (NESet Hash32)) ->
      WorkerCount ->
      TMVar (SyncError Share.DownloadEntitiesError) ->
      IO (Either (SyncError Share.DownloadEntitiesError) ())
    dispatcher :: TVar (Set HashJWT)
-> TVar (Set HashJWT)
-> TQueue
     (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
-> TQueue (Set HashJWT, Maybe (NESet Hash32))
-> WorkerCount
-> TMVar (SyncError DownloadEntitiesError)
-> IO (Either (SyncError DownloadEntitiesError) ())
dispatcher TVar (Set HashJWT)
hashesVar TVar (Set HashJWT)
uninsertedHashesVar TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
entitiesQueue TQueue (Set HashJWT, Maybe (NESet Hash32))
newTempEntitiesQueue WorkerCount
workerCount TMVar (SyncError DownloadEntitiesError)
downloaderFailedVar =
      (Scope -> IO (Either (SyncError DownloadEntitiesError) ()))
-> IO (Either (SyncError DownloadEntitiesError) ())
forall a. (Scope -> IO a) -> IO a
Ki.scoped \Scope
scope ->
        let loop :: IO (Either (SyncError Share.DownloadEntitiesError) ())
            loop :: IO (Either (SyncError DownloadEntitiesError) ())
loop =
              STM DispatcherJob -> IO DispatcherJob
forall a. STM a -> IO a
atomically (STM DispatcherJob
checkIfDownloaderFailedMode STM DispatcherJob -> STM DispatcherJob -> STM DispatcherJob
forall a. STM a -> STM a -> STM a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> STM DispatcherJob
dispatchWorkMode STM DispatcherJob -> STM DispatcherJob -> STM DispatcherJob
forall a. STM a -> STM a -> STM a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> STM DispatcherJob
checkIfDoneMode) IO DispatcherJob
-> (DispatcherJob
    -> IO (Either (SyncError DownloadEntitiesError) ()))
-> IO (Either (SyncError DownloadEntitiesError) ())
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                DispatcherJob
DispatcherDone -> Either (SyncError DownloadEntitiesError) ()
-> IO (Either (SyncError DownloadEntitiesError) ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> Either (SyncError DownloadEntitiesError) ()
forall a b. b -> Either a b
Right ())
                DispatcherReturnEarlyBecauseDownloaderFailed SyncError DownloadEntitiesError
err -> Either (SyncError DownloadEntitiesError) ()
-> IO (Either (SyncError DownloadEntitiesError) ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SyncError DownloadEntitiesError
-> Either (SyncError DownloadEntitiesError) ()
forall a b. a -> Either a b
Left SyncError DownloadEntitiesError
err)
                DispatcherForkWorker NESet HashJWT
hashes -> do
                  STM () -> IO ()
forall a. STM a -> IO a
atomically do
                    -- Limit number of simultaneous downloaders (plus 2, for inserter and elaborator)
                    workers <- WorkerCount -> STM Int
forall a. TVar a -> STM a
readTVar WorkerCount
workerCount
                    check (workers < maxSimultaneousPullDownloaders + 2)
                    -- we do need to record the downloader as working outside of the worker thread, not inside.
                    -- otherwise, we might erroneously fall through the teardown logic below and conclude there's
                    -- nothing more for the dispatcher to do, when in fact a downloader thread just hasn't made it as
                    -- far as recording its own existence
                    recordWorking workerCount
                  _ <-
                    forall a. Scope -> IO a -> IO (Thread a)
Ki.fork @() Scope
scope do
                      TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
-> WorkerCount
-> NESet HashJWT
-> IO (Either (SyncError DownloadEntitiesError) ())
downloader TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
entitiesQueue WorkerCount
workerCount NESet HashJWT
hashes IO (Either (SyncError DownloadEntitiesError) ())
-> (IO (Either (SyncError DownloadEntitiesError) ()) -> IO ())
-> IO ()
forall a b. a -> (a -> b) -> b
& (SyncError DownloadEntitiesError -> IO ())
-> IO (Either (SyncError DownloadEntitiesError) ()) -> IO ()
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> m (Either a b) -> m b
onLeftM \SyncError DownloadEntitiesError
err ->
                        IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (TMVar (SyncError DownloadEntitiesError)
-> SyncError DownloadEntitiesError -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar (SyncError DownloadEntitiesError)
downloaderFailedVar SyncError DownloadEntitiesError
err))
                  loop
         in IO (Either (SyncError DownloadEntitiesError) ())
loop
      where
        checkIfDownloaderFailedMode :: STM DispatcherJob
        checkIfDownloaderFailedMode :: STM DispatcherJob
checkIfDownloaderFailedMode =
          SyncError DownloadEntitiesError -> DispatcherJob
DispatcherReturnEarlyBecauseDownloaderFailed (SyncError DownloadEntitiesError -> DispatcherJob)
-> STM (SyncError DownloadEntitiesError) -> STM DispatcherJob
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMVar (SyncError DownloadEntitiesError)
-> STM (SyncError DownloadEntitiesError)
forall a. TMVar a -> STM a
readTMVar TMVar (SyncError DownloadEntitiesError)
downloaderFailedVar

        dispatchWorkMode :: STM DispatcherJob
        dispatchWorkMode :: STM DispatcherJob
dispatchWorkMode = do
          hashes <- TVar (Set HashJWT) -> STM (Set HashJWT)
forall a. TVar a -> STM a
readTVar TVar (Set HashJWT)
hashesVar
          check (not (Set.null hashes))
          let (hashes1, hashes2) = Set.splitAt syncChunkSize hashes
          modifyTVar' uninsertedHashesVar (Set.union hashes1)
          writeTVar hashesVar hashes2
          pure (DispatcherForkWorker (NESet.unsafeFromSet hashes1))

        -- Check to see if there are no hashes left to download, no outstanding workers, and no work in either queue
        checkIfDoneMode :: STM DispatcherJob
        checkIfDoneMode :: STM DispatcherJob
checkIfDoneMode = do
          workers <- WorkerCount -> STM Int
forall a. TVar a -> STM a
readTVar WorkerCount
workerCount
          check (workers == 0)
          isEmptyTQueue entitiesQueue >>= check
          isEmptyTQueue newTempEntitiesQueue >>= check
          pure DispatcherDone

    -- Downloader thread: download entities, (if successful) enqueue to `entitiesQueue`
    downloader ::
      TQueue (NESet Share.HashJWT, NEMap Hash32 (Share.Entity Text Hash32 Share.HashJWT)) ->
      WorkerCount ->
      NESet Share.HashJWT ->
      IO (Either (SyncError Share.DownloadEntitiesError) ())
    downloader :: TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
-> WorkerCount
-> NESet HashJWT
-> IO (Either (SyncError DownloadEntitiesError) ())
downloader TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
entitiesQueue WorkerCount
workerCount NESet HashJWT
hashes = do
      AuthenticatedHttpClient
-> BaseUrl
-> DownloadEntitiesRequest
-> IO (Either CodeserverTransportError DownloadEntitiesResponse)
httpDownloadEntities AuthenticatedHttpClient
httpClient BaseUrl
unisonShareUrl Share.DownloadEntitiesRequest {RepoInfo
repoInfo :: RepoInfo
repoInfo :: RepoInfo
repoInfo, NESet HashJWT
hashes :: NESet HashJWT
hashes :: NESet HashJWT
hashes} IO (Either CodeserverTransportError DownloadEntitiesResponse)
-> (Either CodeserverTransportError DownloadEntitiesResponse
    -> IO (Either (SyncError DownloadEntitiesError) ()))
-> IO (Either (SyncError DownloadEntitiesError) ())
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Left CodeserverTransportError
err -> do
          STM () -> IO ()
forall a. STM a -> IO a
atomically (WorkerCount -> STM ()
recordNotWorking WorkerCount
workerCount)
          Either (SyncError DownloadEntitiesError) ()
-> IO (Either (SyncError DownloadEntitiesError) ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SyncError DownloadEntitiesError
-> Either (SyncError DownloadEntitiesError) ()
forall a b. a -> Either a b
Left (CodeserverTransportError -> SyncError DownloadEntitiesError
forall e. CodeserverTransportError -> SyncError e
TransportError CodeserverTransportError
err))
        Right (Share.DownloadEntitiesFailure DownloadEntitiesError
err) -> do
          STM () -> IO ()
forall a. STM a -> IO a
atomically (WorkerCount -> STM ()
recordNotWorking WorkerCount
workerCount)
          Either (SyncError DownloadEntitiesError) ()
-> IO (Either (SyncError DownloadEntitiesError) ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SyncError DownloadEntitiesError
-> Either (SyncError DownloadEntitiesError) ()
forall a b. a -> Either a b
Left (DownloadEntitiesError -> SyncError DownloadEntitiesError
forall e. e -> SyncError e
SyncError DownloadEntitiesError
err))
        Right (Share.DownloadEntitiesSuccess NEMap Hash32 (Entity Text Hash32 HashJWT)
entities) -> do
          Int -> IO ()
downloadedCallback (NESet HashJWT -> Int
forall a. NESet a -> Int
NESet.size NESet HashJWT
hashes)
          case NEMap Hash32 (Entity Text Hash32 HashJWT)
-> Either EntityValidationError ()
validateEntities NEMap Hash32 (Entity Text Hash32 HashJWT)
entities of
            Left EntityValidationError
err -> Either (SyncError DownloadEntitiesError) ()
-> IO (Either (SyncError DownloadEntitiesError) ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either (SyncError DownloadEntitiesError) ()
 -> IO (Either (SyncError DownloadEntitiesError) ()))
-> (EntityValidationError
    -> Either (SyncError DownloadEntitiesError) ())
-> EntityValidationError
-> IO (Either (SyncError DownloadEntitiesError) ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError DownloadEntitiesError
-> Either (SyncError DownloadEntitiesError) ()
forall a b. a -> Either a b
Left (SyncError DownloadEntitiesError
 -> Either (SyncError DownloadEntitiesError) ())
-> (EntityValidationError -> SyncError DownloadEntitiesError)
-> EntityValidationError
-> Either (SyncError DownloadEntitiesError) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DownloadEntitiesError -> SyncError DownloadEntitiesError
forall e. e -> SyncError e
SyncError (DownloadEntitiesError -> SyncError DownloadEntitiesError)
-> (EntityValidationError -> DownloadEntitiesError)
-> EntityValidationError
-> SyncError DownloadEntitiesError
forall b c a. (b -> c) -> (a -> b) -> a -> c
. EntityValidationError -> DownloadEntitiesError
Share.DownloadEntitiesEntityValidationFailure (EntityValidationError
 -> IO (Either (SyncError DownloadEntitiesError) ()))
-> EntityValidationError
-> IO (Either (SyncError DownloadEntitiesError) ())
forall a b. (a -> b) -> a -> b
$ EntityValidationError
err
            Right () -> do
              STM () -> IO ()
forall a. STM a -> IO a
atomically do
                TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
-> (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
-> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
entitiesQueue (NESet HashJWT
hashes, NEMap Hash32 (Entity Text Hash32 HashJWT)
entities)
                WorkerCount -> STM ()
recordNotWorking WorkerCount
workerCount
              Either (SyncError DownloadEntitiesError) ()
-> IO (Either (SyncError DownloadEntitiesError) ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> Either (SyncError DownloadEntitiesError) ()
forall a b. b -> Either a b
Right ())

    -- Inserter thread: dequeue from `entitiesQueue`, insert entities, enqueue to `newTempEntitiesQueue`
    inserter ::
      TQueue (NESet Share.HashJWT, NEMap Hash32 (Share.Entity Text Hash32 Share.HashJWT)) ->
      TQueue (Set Share.HashJWT, Maybe (NESet Hash32)) ->
      WorkerCount ->
      IO Void
    inserter :: TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
-> TQueue (Set HashJWT, Maybe (NESet Hash32))
-> WorkerCount
-> IO Void
inserter TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
entitiesQueue TQueue (Set HashJWT, Maybe (NESet Hash32))
newTempEntitiesQueue WorkerCount
workerCount =
      ((forall x. Transaction x -> IO x) -> IO Void) -> IO Void
forall a. ((forall x. Transaction x -> IO x) -> IO a) -> IO a
connect \forall x. Transaction x -> IO x
runTransaction ->
        IO () -> IO Void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever do
          (hashJwts, entities) <-
            STM (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
-> IO (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
forall a. STM a -> IO a
atomically do
              entities <- TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
-> STM (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
forall a. TQueue a -> STM a
readTQueue TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
entitiesQueue
              recordWorking workerCount
              pure entities
          newTempEntities0 <-
            runTransaction do
              NEMap.toList entities & foldMapM \(Hash32
hash, Entity Text Hash32 HashJWT
entity) ->
                Hash32 -> Entity Text Hash32 HashJWT -> Transaction EntityLocation
upsertEntitySomewhere Hash32
hash Entity Text Hash32 HashJWT
entity Transaction EntityLocation
-> (EntityLocation -> Set Hash32) -> Transaction (Set Hash32)
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
                  EntityLocation
Q.EntityInMainStorage -> Set Hash32
forall a. Set a
Set.empty
                  EntityLocation
Q.EntityInTempStorage -> Hash32 -> Set Hash32
forall a. a -> Set a
Set.singleton Hash32
hash
          atomically do
            writeTQueue newTempEntitiesQueue (NESet.toSet hashJwts, NESet.nonEmptySet newTempEntities0)
            recordNotWorking workerCount

    -- Elaborator thread: dequeue from `newTempEntitiesQueue`, elaborate, "enqueue" to `hashesVar`
    elaborator ::
      TVar (Set Share.HashJWT) ->
      TVar (Set Share.HashJWT) ->
      TQueue (Set Share.HashJWT, Maybe (NESet Hash32)) ->
      WorkerCount ->
      IO Void
    elaborator :: TVar (Set HashJWT)
-> TVar (Set HashJWT)
-> TQueue (Set HashJWT, Maybe (NESet Hash32))
-> WorkerCount
-> IO Void
elaborator TVar (Set HashJWT)
hashesVar TVar (Set HashJWT)
uninsertedHashesVar TQueue (Set HashJWT, Maybe (NESet Hash32))
newTempEntitiesQueue WorkerCount
workerCount =
      ((forall x. Transaction x -> IO x) -> IO Void) -> IO Void
forall a. ((forall x. Transaction x -> IO x) -> IO a) -> IO a
connect \forall x. Transaction x -> IO x
runTransaction ->
        IO () -> IO Void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever do
          maybeNewTempEntities <-
            STM (Maybe (NESet Hash32)) -> IO (Maybe (NESet Hash32))
forall a. STM a -> IO a
atomically do
              (hashJwts, mayNewTempEntities) <- TQueue (Set HashJWT, Maybe (NESet Hash32))
-> STM (Set HashJWT, Maybe (NESet Hash32))
forall a. TQueue a -> STM a
readTQueue TQueue (Set HashJWT, Maybe (NESet Hash32))
newTempEntitiesQueue
              -- Avoid unnecessary retaining of these hashes to keep memory usage more stable. This algorithm would
              -- still be correct if we never delete from `uninsertedHashes`.
              --
              -- We remove the inserted hashes from uninsertedHashesVar at this point rather than right after insertion
              -- in order to ensure that no running transaction of the elaborator is viewing a snapshot that precedes
              -- the snapshot that inserted those hashes.
              modifyTVar' uninsertedHashesVar \Set HashJWT
uninsertedHashes -> Set HashJWT -> Set HashJWT -> Set HashJWT
forall a. Ord a => Set a -> Set a -> Set a
Set.difference Set HashJWT
uninsertedHashes Set HashJWT
hashJwts
              case mayNewTempEntities of
                Maybe (NESet Hash32)
Nothing -> Maybe (NESet Hash32) -> STM (Maybe (NESet Hash32))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (NESet Hash32)
forall a. Maybe a
Nothing
                Just NESet Hash32
newTempEntities -> do
                  WorkerCount -> STM ()
recordWorking WorkerCount
workerCount
                  Maybe (NESet Hash32) -> STM (Maybe (NESet Hash32))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (NESet Hash32 -> Maybe (NESet Hash32)
forall a. a -> Maybe a
Just NESet Hash32
newTempEntities)
          whenJust maybeNewTempEntities \NESet Hash32
newTempEntities -> do
            newElaboratedHashes <- Transaction (Set HashJWT) -> IO (Set HashJWT)
forall x. Transaction x -> IO x
runTransaction (NESet Hash32 -> Transaction (Set HashJWT)
elaborateHashes NESet Hash32
newTempEntities)
            atomically do
              uninsertedHashes <- readTVar uninsertedHashesVar
              hashes0 <- readTVar hashesVar
              writeTVar hashesVar $! Set.union (Set.difference newElaboratedHashes uninsertedHashes) hashes0
              recordNotWorking workerCount

-- | Insert entities into the database, and return the subset that went into temp storage (`temp_entitiy`) rather than
-- of main storage (`object` / `causal`) due to missing dependencies.
insertEntities :: NEMap Hash32 (Share.Entity Text Hash32 Share.HashJWT) -> Sqlite.Transaction (Set Hash32)
insertEntities :: NEMap Hash32 (Entity Text Hash32 HashJWT)
-> Transaction (Set Hash32)
insertEntities NEMap Hash32 (Entity Text Hash32 HashJWT)
entities =
  NEMap Hash32 (Entity Text Hash32 HashJWT)
-> NonEmpty (Hash32, Entity Text Hash32 HashJWT)
forall k a. NEMap k a -> NonEmpty (k, a)
NEMap.toList NEMap Hash32 (Entity Text Hash32 HashJWT)
entities NonEmpty (Hash32, Entity Text Hash32 HashJWT)
-> (NonEmpty (Hash32, Entity Text Hash32 HashJWT)
    -> Transaction (Set Hash32))
-> Transaction (Set Hash32)
forall a b. a -> (a -> b) -> b
& ((Hash32, Entity Text Hash32 HashJWT) -> Transaction (Set Hash32))
-> NonEmpty (Hash32, Entity Text Hash32 HashJWT)
-> Transaction (Set Hash32)
forall (m :: * -> *) (f :: * -> *) b a.
(Monad m, Foldable f, Monoid b) =>
(a -> m b) -> f a -> m b
foldMapM \(Hash32
hash, Entity Text Hash32 HashJWT
entity) ->
    Hash32 -> Entity Text Hash32 HashJWT -> Transaction EntityLocation
upsertEntitySomewhere Hash32
hash Entity Text Hash32 HashJWT
entity Transaction EntityLocation
-> (EntityLocation -> Set Hash32) -> Transaction (Set Hash32)
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
      EntityLocation
Q.EntityInMainStorage -> Set Hash32
forall a. Set a
Set.empty
      EntityLocation
Q.EntityInTempStorage -> Hash32 -> Set Hash32
forall a. a -> Set a
Set.singleton Hash32
hash

------------------------------------------------------------------------------------------------------------------------
-- Get causal hash by path

-- | Get the causal hash of a path hosted on Unison Share.
getCausalHashByPath ::
  -- | The Unison Share URL.
  BaseUrl ->
  Share.Path ->
  Cli (Either (SyncError GetCausalHashByPathError) (Maybe Share.HashJWT))
getCausalHashByPath :: BaseUrl
-> Path
-> Cli
     (Either (SyncError GetCausalHashByPathError) (Maybe HashJWT))
getCausalHashByPath BaseUrl
unisonShareUrl Path
repoPath = do
  Cli.Env {authHTTPClient} <- Cli Env
forall r (m :: * -> *). MonadReader r m => m r
ask
  liftIO (httpGetCausalHashByPath authHTTPClient unisonShareUrl (Share.GetCausalHashByPathRequest repoPath)) <&> \case
    Left CodeserverTransportError
err -> SyncError GetCausalHashByPathError
-> Either (SyncError GetCausalHashByPathError) (Maybe HashJWT)
forall a b. a -> Either a b
Left (CodeserverTransportError -> SyncError GetCausalHashByPathError
forall e. CodeserverTransportError -> SyncError e
TransportError CodeserverTransportError
err)
    Right (Share.GetCausalHashByPathSuccess Maybe HashJWT
maybeHashJwt) -> Maybe HashJWT
-> Either (SyncError GetCausalHashByPathError) (Maybe HashJWT)
forall a b. b -> Either a b
Right Maybe HashJWT
maybeHashJwt
    Right (Share.GetCausalHashByPathNoReadPermission Path
_) ->
      SyncError GetCausalHashByPathError
-> Either (SyncError GetCausalHashByPathError) (Maybe HashJWT)
forall a b. a -> Either a b
Left (GetCausalHashByPathError -> SyncError GetCausalHashByPathError
forall e. e -> SyncError e
SyncError (Path -> GetCausalHashByPathError
GetCausalHashByPathErrorNoReadPermission Path
repoPath))
    Right (Share.GetCausalHashByPathInvalidRepoInfo Text
err RepoInfo
repoInfo) ->
      SyncError GetCausalHashByPathError
-> Either (SyncError GetCausalHashByPathError) (Maybe HashJWT)
forall a b. a -> Either a b
Left (GetCausalHashByPathError -> SyncError GetCausalHashByPathError
forall e. e -> SyncError e
SyncError (Text -> RepoInfo -> GetCausalHashByPathError
GetCausalHashByPathErrorInvalidRepoInfo Text
err RepoInfo
repoInfo))
    Right GetCausalHashByPathResponse
Share.GetCausalHashByPathUserNotFound ->
      SyncError GetCausalHashByPathError
-> Either (SyncError GetCausalHashByPathError) (Maybe HashJWT)
forall a b. a -> Either a b
Left (GetCausalHashByPathError -> SyncError GetCausalHashByPathError
forall e. e -> SyncError e
SyncError (GetCausalHashByPathError -> SyncError GetCausalHashByPathError)
-> GetCausalHashByPathError -> SyncError GetCausalHashByPathError
forall a b. (a -> b) -> a -> b
$ RepoInfo -> GetCausalHashByPathError
GetCausalHashByPathErrorUserNotFound (Path -> RepoInfo
Share.pathRepoInfo Path
repoPath))

------------------------------------------------------------------------------------------------------------------------
-- Upload entities

data UploadDispatcherJob
  = UploadDispatcherReturnFailure (SyncError Share.UploadEntitiesError)
  | UploadDispatcherForkWorkerWhenAvailable (NESet Hash32)
  | UploadDispatcherForkWorker (NESet Hash32)
  | UploadDispatcherDone

-- | Upload a set of entities to Unison Share. If the server responds that it cannot yet store any hash(es) due to
-- missing dependencies, send those dependencies too, and on and on, until the server stops responding that it's missing
-- anything.
--
-- Returns true on success, false on failure (because the user does not have write permission).
uploadEntities ::
  BaseUrl ->
  Share.RepoInfo ->
  NESet Hash32 ->
  (Int -> IO ()) ->
  Cli (Either (SyncError Share.UploadEntitiesError) ())
uploadEntities :: BaseUrl
-> RepoInfo
-> NESet Hash32
-> (Int -> IO ())
-> Cli (Either (SyncError UploadEntitiesError) ())
uploadEntities BaseUrl
unisonShareUrl RepoInfo
repoInfo NESet Hash32
hashes0 Int -> IO ()
uploadedCallback = do
  Cli.Env {authHTTPClient, codebase} <- Cli Env
forall r (m :: * -> *). MonadReader r m => m r
ask

  liftIO do
    hashesVar <- newTVarIO (NESet.toSet hashes0)
    -- Semantically, this is the set of hashes we've uploaded so far, but we do delete from it when it's safe to, so it
    -- doesn't grow unbounded. It's used to filter out hashes that would be duplicate uploads: the server, when
    -- responding to any particular upload request, may declare that it still needs some hashes that we're in the
    -- process of uploading from another thread.
    dedupeVar <- newTVarIO Set.empty
    nextWorkerIdVar <- newTVarIO 0
    workersVar <- newTVarIO Set.empty
    workerFailedVar <- newEmptyTMVarIO

    Ki.scoped \Scope
scope ->
      Scope
-> AuthenticatedHttpClient
-> (forall x. Transaction x -> IO x)
-> TVar (Set Hash32)
-> TVar (Set Hash32)
-> WorkerCount
-> TVar (Set Int)
-> TMVar (SyncError UploadEntitiesError)
-> IO (Either (SyncError UploadEntitiesError) ())
dispatcher
        Scope
scope
        AuthenticatedHttpClient
authHTTPClient
        (Codebase IO Symbol Ann -> Transaction a -> IO a
forall (m :: * -> *) v a b.
MonadIO m =>
Codebase m v a -> Transaction b -> m b
Codebase.runTransaction Codebase IO Symbol Ann
codebase)
        TVar (Set Hash32)
hashesVar
        TVar (Set Hash32)
dedupeVar
        WorkerCount
nextWorkerIdVar
        TVar (Set Int)
workersVar
        TMVar (SyncError UploadEntitiesError)
workerFailedVar
  where
    dispatcher ::
      Ki.Scope ->
      AuthenticatedHttpClient ->
      (forall a. Sqlite.Transaction a -> IO a) ->
      TVar (Set Hash32) ->
      TVar (Set Hash32) ->
      TVar Int ->
      TVar (Set Int) ->
      TMVar (SyncError Share.UploadEntitiesError) ->
      IO (Either (SyncError Share.UploadEntitiesError) ())
    dispatcher :: Scope
-> AuthenticatedHttpClient
-> (forall x. Transaction x -> IO x)
-> TVar (Set Hash32)
-> TVar (Set Hash32)
-> WorkerCount
-> TVar (Set Int)
-> TMVar (SyncError UploadEntitiesError)
-> IO (Either (SyncError UploadEntitiesError) ())
dispatcher Scope
scope AuthenticatedHttpClient
httpClient forall x. Transaction x -> IO x
runTransaction TVar (Set Hash32)
hashesVar TVar (Set Hash32)
dedupeVar WorkerCount
nextWorkerIdVar TVar (Set Int)
workersVar TMVar (SyncError UploadEntitiesError)
workerFailedVar = do
      loop
      where
        loop :: IO (Either (SyncError Share.UploadEntitiesError) ())
        loop :: IO (Either (SyncError UploadEntitiesError) ())
loop =
          [STM UploadDispatcherJob]
-> IO (Either (SyncError UploadEntitiesError) ())
doJob [STM UploadDispatcherJob
checkForFailureMode, STM UploadDispatcherJob
dispatchWorkMode, STM UploadDispatcherJob
checkIfDoneMode]

        doJob :: [STM UploadDispatcherJob] -> IO (Either (SyncError Share.UploadEntitiesError) ())
        doJob :: [STM UploadDispatcherJob]
-> IO (Either (SyncError UploadEntitiesError) ())
doJob [STM UploadDispatcherJob]
jobs =
          STM UploadDispatcherJob -> IO UploadDispatcherJob
forall a. STM a -> IO a
atomically ([STM UploadDispatcherJob] -> STM UploadDispatcherJob
forall (t :: * -> *) (f :: * -> *) a.
(Foldable t, Alternative f) =>
t (f a) -> f a
asum [STM UploadDispatcherJob]
jobs) IO UploadDispatcherJob
-> (UploadDispatcherJob
    -> IO (Either (SyncError UploadEntitiesError) ()))
-> IO (Either (SyncError UploadEntitiesError) ())
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
            UploadDispatcherReturnFailure SyncError UploadEntitiesError
err -> Either (SyncError UploadEntitiesError) ()
-> IO (Either (SyncError UploadEntitiesError) ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SyncError UploadEntitiesError
-> Either (SyncError UploadEntitiesError) ()
forall a b. a -> Either a b
Left SyncError UploadEntitiesError
err)
            UploadDispatcherForkWorkerWhenAvailable NESet Hash32
hashes -> [STM UploadDispatcherJob]
-> IO (Either (SyncError UploadEntitiesError) ())
doJob [NESet Hash32 -> STM UploadDispatcherJob
forkWorkerMode NESet Hash32
hashes, STM UploadDispatcherJob
checkForFailureMode]
            UploadDispatcherForkWorker NESet Hash32
hashes -> do
              workerId <-
                STM Int -> IO Int
forall a. STM a -> IO a
atomically do
                  workerId <- WorkerCount -> STM Int
forall a. TVar a -> STM a
readTVar WorkerCount
nextWorkerIdVar
                  writeTVar nextWorkerIdVar $! workerId + 1
                  modifyTVar' workersVar (Set.insert workerId)
                  pure workerId
              _ <-
                Ki.fork @() scope do
                  worker httpClient runTransaction hashesVar dedupeVar workersVar workerFailedVar workerId hashes
              loop
            UploadDispatcherJob
UploadDispatcherDone -> Either (SyncError UploadEntitiesError) ()
-> IO (Either (SyncError UploadEntitiesError) ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> Either (SyncError UploadEntitiesError) ()
forall a b. b -> Either a b
Right ())

        checkForFailureMode :: STM UploadDispatcherJob
        checkForFailureMode :: STM UploadDispatcherJob
checkForFailureMode = do
          err <- TMVar (SyncError UploadEntitiesError)
-> STM (SyncError UploadEntitiesError)
forall a. TMVar a -> STM a
readTMVar TMVar (SyncError UploadEntitiesError)
workerFailedVar
          pure (UploadDispatcherReturnFailure err)

        dispatchWorkMode :: STM UploadDispatcherJob
        dispatchWorkMode :: STM UploadDispatcherJob
dispatchWorkMode = do
          hashes <- TVar (Set Hash32) -> STM (Set Hash32)
forall a. TVar a -> STM a
readTVar TVar (Set Hash32)
hashesVar
          when (Set.null hashes) retry
          let (hashes1, hashes2) = Set.splitAt syncChunkSize hashes
          modifyTVar' dedupeVar (Set.union hashes1)
          writeTVar hashesVar hashes2
          pure (UploadDispatcherForkWorkerWhenAvailable (NESet.unsafeFromSet hashes1))

        forkWorkerMode :: NESet Hash32 -> STM UploadDispatcherJob
        forkWorkerMode :: NESet Hash32 -> STM UploadDispatcherJob
forkWorkerMode NESet Hash32
hashes = do
          workers <- TVar (Set Int) -> STM (Set Int)
forall a. TVar a -> STM a
readTVar TVar (Set Int)
workersVar
          when (Set.size workers >= maxSimultaneousPushWorkers) retry
          pure (UploadDispatcherForkWorker hashes)

        checkIfDoneMode :: STM UploadDispatcherJob
        checkIfDoneMode :: STM UploadDispatcherJob
checkIfDoneMode = do
          workers <- TVar (Set Int) -> STM (Set Int)
forall a. TVar a -> STM a
readTVar TVar (Set Int)
workersVar
          when (not (Set.null workers)) retry
          pure UploadDispatcherDone

    worker ::
      AuthenticatedHttpClient ->
      (forall a. Sqlite.Transaction a -> IO a) ->
      TVar (Set Hash32) ->
      TVar (Set Hash32) ->
      TVar (Set Int) ->
      TMVar (SyncError Share.UploadEntitiesError) ->
      Int ->
      NESet Hash32 ->
      IO ()
    worker :: AuthenticatedHttpClient
-> (forall x. Transaction x -> IO x)
-> TVar (Set Hash32)
-> TVar (Set Hash32)
-> TVar (Set Int)
-> TMVar (SyncError UploadEntitiesError)
-> Int
-> NESet Hash32
-> IO ()
worker AuthenticatedHttpClient
httpClient forall x. Transaction x -> IO x
runTransaction TVar (Set Hash32)
hashesVar TVar (Set Hash32)
dedupeVar TVar (Set Int)
workersVar TMVar (SyncError UploadEntitiesError)
workerFailedVar Int
workerId NESet Hash32
hashes = do
      entities <-
        (NonEmpty (Hash32, Entity Text Hash32 Hash32)
 -> NEMap Hash32 (Entity Text Hash32 Hash32))
-> IO (NonEmpty (Hash32, Entity Text Hash32 Hash32))
-> IO (NEMap Hash32 (Entity Text Hash32 Hash32))
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap NonEmpty (Hash32, Entity Text Hash32 Hash32)
-> NEMap Hash32 (Entity Text Hash32 Hash32)
forall k a. Eq k => NonEmpty (k, a) -> NEMap k a
NEMap.fromAscList do
          Transaction (NonEmpty (Hash32, Entity Text Hash32 Hash32))
-> IO (NonEmpty (Hash32, Entity Text Hash32 Hash32))
forall x. Transaction x -> IO x
runTransaction do
            NonEmpty Hash32
-> (Hash32 -> Transaction (Hash32, Entity Text Hash32 Hash32))
-> Transaction (NonEmpty (Hash32, Entity Text Hash32 Hash32))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for (NESet Hash32 -> NonEmpty Hash32
forall a. NESet a -> NonEmpty a
NESet.toAscList NESet Hash32
hashes) \Hash32
hash -> do
              entity <- Hash32 -> Transaction (Entity Text Hash32 Hash32)
expectEntity Hash32
hash
              pure (hash, entity)

      result <-
        httpUploadEntities httpClient unisonShareUrl Share.UploadEntitiesRequest {entities, repoInfo} <&> \case
          Left CodeserverTransportError
err -> SyncError UploadEntitiesError
-> Either (SyncError UploadEntitiesError) (Set Hash32)
forall a b. a -> Either a b
Left (CodeserverTransportError -> SyncError UploadEntitiesError
forall e. CodeserverTransportError -> SyncError e
TransportError CodeserverTransportError
err)
          Right UploadEntitiesResponse
response ->
            case UploadEntitiesResponse
response of
              UploadEntitiesResponse
Share.UploadEntitiesSuccess -> Set Hash32 -> Either (SyncError UploadEntitiesError) (Set Hash32)
forall a b. b -> Either a b
Right Set Hash32
forall a. Set a
Set.empty
              Share.UploadEntitiesFailure UploadEntitiesError
err ->
                case UploadEntitiesError
err of
                  Share.UploadEntitiesError'NeedDependencies (Share.NeedDependencies NESet Hash32
moreHashes) ->
                    Set Hash32 -> Either (SyncError UploadEntitiesError) (Set Hash32)
forall a b. b -> Either a b
Right (NESet Hash32 -> Set Hash32
forall a. NESet a -> Set a
NESet.toSet NESet Hash32
moreHashes)
                  UploadEntitiesError
err -> SyncError UploadEntitiesError
-> Either (SyncError UploadEntitiesError) (Set Hash32)
forall a b. a -> Either a b
Left (UploadEntitiesError -> SyncError UploadEntitiesError
forall e. e -> SyncError e
SyncError UploadEntitiesError
err)

      case result of
        Left SyncError UploadEntitiesError
err -> IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (TMVar (SyncError UploadEntitiesError)
-> SyncError UploadEntitiesError -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar (SyncError UploadEntitiesError)
workerFailedVar SyncError UploadEntitiesError
err))
        Right Set Hash32
moreHashes -> do
          Int -> IO ()
uploadedCallback (NESet Hash32 -> Int
forall a. NESet a -> Int
NESet.size NESet Hash32
hashes)
          maybeYoungestWorkerThatWasAlive <-
            STM (Maybe Int) -> IO (Maybe Int)
forall a. STM a -> IO a
atomically do
              -- Record ourselves as "dead". The only work we have left to do is remove the hashes we just uploaded from
              -- the `dedupe` set, but whether or not we are "alive" is relevant only to:
              --
              --   - The main dispatcher thread, which terminates when there are no more hashes to upload, and no alive
              --     workers. It is not important for us to delete from the `dedupe` set in this case.
              --
              --   - Other worker threads, each of which independently decides when it is safe to delete the set of
              --     hashes they just uploaded from the `dedupe` set (as we are doing now).
              !workers <- Int -> Set Int -> Set Int
forall a. Ord a => a -> Set a -> Set a
Set.delete Int
workerId (Set Int -> Set Int) -> STM (Set Int) -> STM (Set Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar (Set Int) -> STM (Set Int)
forall a. TVar a -> STM a
readTVar TVar (Set Int)
workersVar
              writeTVar workersVar workers
              -- Add more work (i.e. hashes to upload) to the work queue (really a work set), per the response we just
              -- got from the server. Remember to only add hashes that aren't in the `dedupe` set (see the comment on
              -- the dedupe set above for more info).
              when (not (Set.null moreHashes)) do
                dedupe <- readTVar dedupeVar
                hashes0 <- readTVar hashesVar
                writeTVar hashesVar $! Set.union (Set.difference moreHashes dedupe) hashes0
              pure (Set.lookupMax workers)
          -- Block until we are sure that the server does not have any uncommitted transactions that see a version of
          -- the database that does not include the entities we just uploaded. After that point, it's fine to remove the
          -- hashes of the entities we just uploaded from the `dedupe` set, because they will never be relevant for any
          -- subsequent deduping operations. If we didn't delete from the `dedupe` set, this algorithm would still be
          -- correct, it would just use an unbounded amount of memory to remember all the hashes we've uploaded so far.
          whenJust maybeYoungestWorkerThatWasAlive \Int
youngestWorkerThatWasAlive -> do
            STM () -> IO ()
forall a. STM a -> IO a
atomically do
              workers <- TVar (Set Int) -> STM (Set Int)
forall a. TVar a -> STM a
readTVar TVar (Set Int)
workersVar
              whenJust (Set.lookupMin workers) \Int
oldestWorkerAlive ->
                Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
oldestWorkerAlive Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
youngestWorkerThatWasAlive) STM ()
forall a. STM a
retry
          atomically (modifyTVar' dedupeVar (`Set.difference` (NESet.toSet hashes)))

------------------------------------------------------------------------------------------------------------------------
-- Database operations

-- | "Elaborate" a set of `temp_entity` hashes.
--
-- For each hash, then we ought to instead download its missing dependencies (which themselves are
--    elaborated by this same procedure, in case we have any of *them* already in temp storage, too.
-- 3. If it's in main storage, we should ignore it.
--
-- In the end, we return a set of hashes that correspond to entities we actually need to download.
elaborateHashes :: NESet Hash32 -> Sqlite.Transaction (Set Share.HashJWT)
elaborateHashes :: NESet Hash32 -> Transaction (Set HashJWT)
elaborateHashes NESet Hash32
hashes =
  NonEmpty Hash32 -> Transaction [Text]
Q.elaborateHashes (NESet Hash32 -> NonEmpty Hash32
forall a. NESet a -> NonEmpty a
NESet.toList NESet Hash32
hashes) Transaction [Text]
-> ([Text] -> Set HashJWT) -> Transaction (Set HashJWT)
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> [HashJWT] -> Set HashJWT
forall a. Ord a => [a] -> Set a
Set.fromList ([HashJWT] -> Set HashJWT)
-> ([Text] -> [HashJWT]) -> [Text] -> Set HashJWT
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. Coercible a b => a -> b
forall a b. Coercible a b => a -> b
coerce @[Text] @[Share.HashJWT]

-- | Upsert a downloaded entity "somewhere" -
--
--   1. Nowhere if we already had the entity (in main or temp storage).
--   2. In main storage if we already have all of its dependencies in main storage.
--   3. In temp storage otherwise.
upsertEntitySomewhere ::
  Hash32 ->
  Share.Entity Text Hash32 Share.HashJWT ->
  Sqlite.Transaction Q.EntityLocation
upsertEntitySomewhere :: Hash32 -> Entity Text Hash32 HashJWT -> Transaction EntityLocation
upsertEntitySomewhere Hash32
hash Entity Text Hash32 HashJWT
entity =
  Hash32 -> Transaction (Maybe EntityLocation)
Q.entityLocation Hash32
hash Transaction (Maybe EntityLocation)
-> (Maybe EntityLocation -> Transaction EntityLocation)
-> Transaction EntityLocation
forall a b. Transaction a -> (a -> Transaction b) -> Transaction b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Just EntityLocation
location -> EntityLocation -> Transaction EntityLocation
forall a. a -> Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure EntityLocation
location
    Maybe EntityLocation
Nothing -> do
      missingDependencies1 :: Map Hash32 Share.HashJWT <-
        Entity Text Hash32 HashJWT -> Set HashJWT
forall hash text noSyncHash.
Ord hash =>
Entity text noSyncHash hash -> Set hash
Share.entityDependencies Entity Text Hash32 HashJWT
entity
          Set HashJWT
-> (Set HashJWT -> Transaction (Map Hash32 HashJWT))
-> Transaction (Map Hash32 HashJWT)
forall a b. a -> (a -> b) -> b
& (HashJWT -> Transaction (Map Hash32 HashJWT))
-> Set HashJWT -> Transaction (Map Hash32 HashJWT)
forall (m :: * -> *) (f :: * -> *) b a.
(Monad m, Foldable f, Monoid b) =>
(a -> m b) -> f a -> m b
foldMapM
            ( \HashJWT
hashJwt -> do
                let hash :: Hash32
hash = HashJWT -> Hash32
Share.hashJWTHash HashJWT
hashJwt
                Hash32 -> Transaction Bool
Q.entityExists Hash32
hash Transaction Bool
-> (Bool -> Map Hash32 HashJWT) -> Transaction (Map Hash32 HashJWT)
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
                  Bool
True -> Map Hash32 HashJWT
forall k a. Map k a
Map.empty
                  Bool
False -> Hash32 -> HashJWT -> Map Hash32 HashJWT
forall k a. k -> a -> Map k a
Map.singleton Hash32
hash HashJWT
hashJwt
            )
      case NEMap.nonEmptyMap missingDependencies1 of
        Maybe (NEMap Hash32 HashJWT)
Nothing -> do
          _id <- HashHandle
-> Hash32
-> TempEntity
-> Transaction (Either CausalHashId ObjectId)
Q.saveTempEntityInMain HashHandle
v2HashHandle Hash32
hash ((HashJWT -> Hash32) -> Entity Text Hash32 HashJWT -> TempEntity
forall hash.
(hash -> Hash32) -> Entity Text Hash32 hash -> TempEntity
entityToTempEntity HashJWT -> Hash32
Share.hashJWTHash Entity Text Hash32 HashJWT
entity)
          pure Q.EntityInMainStorage
        Just NEMap Hash32 HashJWT
missingDependencies -> do
          Hash32 -> TempEntity -> NEMap Hash32 Text -> Transaction ()
Q.insertTempEntity
            Hash32
hash
            ((HashJWT -> Hash32) -> Entity Text Hash32 HashJWT -> TempEntity
forall hash.
(hash -> Hash32) -> Entity Text Hash32 hash -> TempEntity
entityToTempEntity HashJWT -> Hash32
Share.hashJWTHash Entity Text Hash32 HashJWT
entity)
            ( forall a b. Coercible a b => a -> b
forall a b. Coercible a b => a -> b
coerce
                @(NEMap Hash32 Share.HashJWT)
                @(NEMap Hash32 Text)
                NEMap Hash32 HashJWT
missingDependencies
            )
          EntityLocation -> Transaction EntityLocation
forall a. a -> Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure EntityLocation
Q.EntityInTempStorage

------------------------------------------------------------------------------------------------------------------------
-- HTTP calls

httpGetCausalHashByPath ::
  Auth.AuthenticatedHttpClient ->
  BaseUrl ->
  Share.GetCausalHashByPathRequest ->
  IO (Either CodeserverTransportError Share.GetCausalHashByPathResponse)
httpDownloadEntities ::
  Auth.AuthenticatedHttpClient ->
  BaseUrl ->
  Share.DownloadEntitiesRequest ->
  IO (Either CodeserverTransportError Share.DownloadEntitiesResponse)
httpUploadEntities ::
  Auth.AuthenticatedHttpClient ->
  BaseUrl ->
  Share.UploadEntitiesRequest ->
  IO (Either CodeserverTransportError Share.UploadEntitiesResponse)
( AuthenticatedHttpClient
-> BaseUrl
-> GetCausalHashByPathRequest
-> IO (Either CodeserverTransportError GetCausalHashByPathResponse)
httpGetCausalHashByPath,
  AuthenticatedHttpClient
-> BaseUrl
-> DownloadEntitiesRequest
-> IO (Either CodeserverTransportError DownloadEntitiesResponse)
httpDownloadEntities,
  AuthenticatedHttpClient
-> BaseUrl
-> UploadEntitiesRequest
-> IO (Either CodeserverTransportError UploadEntitiesResponse)
httpUploadEntities
  ) =
    let ( GetCausalHashByPathRequest
-> ReaderT
     ClientEnv
     (ExceptT CodeserverTransportError IO)
     GetCausalHashByPathResponse
httpGetCausalHashByPath
            Servant.:<|> DownloadEntitiesRequest
-> ReaderT
     ClientEnv
     (ExceptT CodeserverTransportError IO)
     DownloadEntitiesResponse
httpDownloadEntities
            Servant.:<|> UploadEntitiesRequest
-> ReaderT
     ClientEnv
     (ExceptT CodeserverTransportError IO)
     UploadEntitiesResponse
httpUploadEntities
          ) =
            let pp :: Proxy ("ucm" Servant.:> "v1" Servant.:> "sync" Servant.:> Share.API)
                pp :: Proxy ("ucm" :> ("v1" :> ("sync" :> API)))
pp = Proxy ("ucm" :> ("v1" :> ("sync" :> API)))
forall {k} (t :: k). Proxy t
Proxy
             in Proxy ("ucm" :> ("v1" :> ("sync" :> API)))
-> (forall a.
    ClientM a
    -> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) a)
-> Client ClientM ("ucm" :> ("v1" :> ("sync" :> API)))
-> Client
     (ReaderT ClientEnv (ExceptT CodeserverTransportError IO))
     ("ucm" :> ("v1" :> ("sync" :> API)))
forall api (m :: * -> *) (n :: * -> *).
HasClient ClientM api =>
Proxy api -> (forall a. m a -> n a) -> Client m api -> Client n api
Servant.hoistClient Proxy ("ucm" :> ("v1" :> ("sync" :> API)))
pp ClientM a
-> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) a
forall a.
ClientM a
-> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) a
hoist (Proxy ("ucm" :> ("v1" :> ("sync" :> API)))
-> Client ClientM ("ucm" :> ("v1" :> ("sync" :> API)))
forall api.
HasClient ClientM api =>
Proxy api -> Client ClientM api
Servant.client Proxy ("ucm" :> ("v1" :> ("sync" :> API)))
pp)
     in ( (GetCausalHashByPathRequest
 -> ReaderT
      ClientEnv
      (ExceptT CodeserverTransportError IO)
      GetCausalHashByPathResponse)
-> AuthenticatedHttpClient
-> BaseUrl
-> GetCausalHashByPathRequest
-> IO (Either CodeserverTransportError GetCausalHashByPathResponse)
forall req resp.
(req
 -> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) resp)
-> AuthenticatedHttpClient
-> BaseUrl
-> req
-> IO (Either CodeserverTransportError resp)
go GetCausalHashByPathRequest
-> ReaderT
     ClientEnv
     (ExceptT CodeserverTransportError IO)
     GetCausalHashByPathResponse
httpGetCausalHashByPath,
          (DownloadEntitiesRequest
 -> ReaderT
      ClientEnv
      (ExceptT CodeserverTransportError IO)
      DownloadEntitiesResponse)
-> AuthenticatedHttpClient
-> BaseUrl
-> DownloadEntitiesRequest
-> IO (Either CodeserverTransportError DownloadEntitiesResponse)
forall req resp.
(req
 -> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) resp)
-> AuthenticatedHttpClient
-> BaseUrl
-> req
-> IO (Either CodeserverTransportError resp)
go DownloadEntitiesRequest
-> ReaderT
     ClientEnv
     (ExceptT CodeserverTransportError IO)
     DownloadEntitiesResponse
httpDownloadEntities,
          (UploadEntitiesRequest
 -> ReaderT
      ClientEnv
      (ExceptT CodeserverTransportError IO)
      UploadEntitiesResponse)
-> AuthenticatedHttpClient
-> BaseUrl
-> UploadEntitiesRequest
-> IO (Either CodeserverTransportError UploadEntitiesResponse)
forall req resp.
(req
 -> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) resp)
-> AuthenticatedHttpClient
-> BaseUrl
-> req
-> IO (Either CodeserverTransportError resp)
go UploadEntitiesRequest
-> ReaderT
     ClientEnv
     (ExceptT CodeserverTransportError IO)
     UploadEntitiesResponse
httpUploadEntities
        )
    where
      hoist :: Servant.ClientM a -> ReaderT Servant.ClientEnv (ExceptT CodeserverTransportError IO) a
      hoist :: forall a.
ClientM a
-> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) a
hoist ClientM a
m = do
        clientEnv <- ReaderT ClientEnv (ExceptT CodeserverTransportError IO) ClientEnv
forall (m :: * -> *) r. Monad m => ReaderT r m r
Reader.ask
        liftIO (Servant.runClientM m clientEnv) >>= \case
          Right a
a -> a -> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) a
forall a.
a -> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
a
          Left ClientError
err -> do
            DebugFlag
-> String
-> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) ()
forall (m :: * -> *). Monad m => DebugFlag -> String -> m ()
Debug.debugLogM DebugFlag
Debug.Sync (ClientError -> String
forall a. Show a => a -> String
show ClientError
err)
            CodeserverTransportError
-> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) a
forall a.
CodeserverTransportError
-> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError case ClientError
err of
              Servant.FailureResponse RequestF () (BaseUrl, ByteString)
_req Response
resp ->
                case Status -> Int
HTTP.statusCode (Status -> Int) -> Status -> Int
forall a b. (a -> b) -> a -> b
$ Response -> Status
forall a. ResponseF a -> Status
Servant.responseStatusCode Response
resp of
                  Int
401 -> BaseUrl -> CodeserverTransportError
Unauthenticated (ClientEnv -> BaseUrl
Servant.baseUrl ClientEnv
clientEnv)
                  -- The server should provide semantically relevant permission-denied messages
                  -- when possible, but this should catch any we miss.
                  Int
403 -> Text -> CodeserverTransportError
PermissionDenied (LazyText -> Text
Text.Lazy.toStrict (LazyText -> Text)
-> (ByteString -> LazyText) -> ByteString -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> LazyText
Text.Lazy.decodeUtf8 (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ Response -> ByteString
forall a. ResponseF a -> a
Servant.responseBody Response
resp)
                  Int
408 -> CodeserverTransportError
Timeout
                  Int
429 -> CodeserverTransportError
RateLimitExceeded
                  Int
504 -> CodeserverTransportError
Timeout
                  Int
_ -> Response -> CodeserverTransportError
UnexpectedResponse Response
resp
              Servant.DecodeFailure Text
msg Response
resp -> Text -> Response -> CodeserverTransportError
DecodeFailure Text
msg Response
resp
              Servant.UnsupportedContentType MediaType
_ct Response
resp -> Response -> CodeserverTransportError
UnexpectedResponse Response
resp
              Servant.InvalidContentTypeHeader Response
resp -> Response -> CodeserverTransportError
UnexpectedResponse Response
resp
              Servant.ConnectionError SomeException
_ -> BaseUrl -> CodeserverTransportError
UnreachableCodeserver (ClientEnv -> BaseUrl
Servant.baseUrl ClientEnv
clientEnv)

      go ::
        (req -> ReaderT Servant.ClientEnv (ExceptT CodeserverTransportError IO) resp) ->
        Auth.AuthenticatedHttpClient ->
        BaseUrl ->
        req ->
        IO (Either CodeserverTransportError resp)
      go :: forall req resp.
(req
 -> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) resp)
-> AuthenticatedHttpClient
-> BaseUrl
-> req
-> IO (Either CodeserverTransportError resp)
go req -> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) resp
f (Auth.AuthenticatedHttpClient Manager
httpClient) BaseUrl
unisonShareUrl req
req =
        (Manager -> BaseUrl -> ClientEnv
Servant.mkClientEnv Manager
httpClient BaseUrl
unisonShareUrl)
          { Servant.makeClientRequest = \BaseUrl
url Request
request ->
              -- Disable client-side timeouts
              (BaseUrl -> Request -> IO Request
forall (f :: * -> *).
Applicative f =>
BaseUrl -> Request -> f Request
Servant.defaultMakeClientRequest BaseUrl
url Request
request)
                IO Request -> (Request -> Request) -> IO Request
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \Request
r ->
                  Request
r
                    { Http.Client.responseTimeout = Http.Client.responseTimeoutNone
                    }
          }
          ClientEnv
-> (ClientEnv -> ExceptT CodeserverTransportError IO resp)
-> ExceptT CodeserverTransportError IO resp
forall a b. a -> (a -> b) -> b
& ReaderT ClientEnv (ExceptT CodeserverTransportError IO) resp
-> ClientEnv -> ExceptT CodeserverTransportError IO resp
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (req -> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) resp
f req
req)
          ExceptT CodeserverTransportError IO resp
-> (ExceptT CodeserverTransportError IO resp
    -> IO (Either CodeserverTransportError resp))
-> IO (Either CodeserverTransportError resp)
forall a b. a -> (a -> b) -> b
& ExceptT CodeserverTransportError IO resp
-> IO (Either CodeserverTransportError resp)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT