{-# LANGUAGE DataKinds #-}
{-# LANGUAGE TypeOperators #-}

module Unison.Share.Sync
  ( -- ** Get causal hash by path
    getCausalHashByPath,
    GetCausalHashByPathError (..),

    -- ** Upload
    uploadEntities,

    -- ** Pull/Download
    pull,
    PullError (..),
    downloadEntities,
  )
where

import Control.Concurrent.STM
import Control.Lens
import Control.Monad.Except
import Control.Monad.Reader (ask)
import Control.Monad.Trans.Reader (ReaderT, runReaderT)
import Control.Monad.Trans.Reader qualified as Reader
import Data.Map qualified as Map
import Data.Map.NonEmpty (NEMap)
import Data.Map.NonEmpty qualified as NEMap
import Data.Proxy
import Data.Set qualified as Set
import Data.Set.NonEmpty (NESet)
import Data.Set.NonEmpty qualified as NESet
import Data.Text.Lazy qualified as Text.Lazy
import Data.Text.Lazy.Encoding qualified as Text.Lazy
import GHC.IO (unsafePerformIO)
import Ki qualified
import Network.HTTP.Client qualified as Http.Client
import Network.HTTP.Types qualified as HTTP
import Servant.API qualified as Servant ((:<|>) (..), (:>))
import Servant.Client (BaseUrl)
import Servant.Client qualified as Servant
import System.Environment (lookupEnv)
import U.Codebase.HashTags (CausalHash)
import U.Codebase.Sqlite.Queries qualified as Q
import U.Codebase.Sqlite.V2.HashHandle (v2HashHandle)
import Unison.Auth.HTTPClient (AuthenticatedHttpClient)
import Unison.Auth.HTTPClient qualified as Auth
import Unison.Cli.Monad (Cli)
import Unison.Cli.Monad qualified as Cli
import Unison.Codebase qualified as Codebase
import Unison.Debug qualified as Debug
import Unison.Hash32 (Hash32)
import Unison.Prelude
import Unison.Share.API.Hash qualified as Share
import Unison.Share.ExpectedHashMismatches (expectedCausalHashMismatches, expectedComponentHashMismatches)
import Unison.Share.Sync.Types
import Unison.Sqlite qualified as Sqlite
import Unison.Sync.API qualified as Share (API)
import Unison.Sync.Common (entityToTempEntity, expectEntity, hash32ToCausalHash)
import Unison.Sync.EntityValidation qualified as EV
import Unison.Sync.Types qualified as Share
import Unison.Util.Monoid (foldMapM)

------------------------------------------------------------------------------------------------------------------------
-- Pile of constants

-- | The maximum number of downloader threads, during a pull.
maxSimultaneousPullDownloaders :: Int
maxSimultaneousPullDownloaders :: Int
maxSimultaneousPullDownloaders = IO Int -> Int
forall a. IO a -> a
unsafePerformIO (IO Int -> Int) -> IO Int -> Int
forall a b. (a -> b) -> a -> b
$ do
  String -> IO (Maybe String)
lookupEnv String
"UNISON_PULL_WORKERS" IO (Maybe String) -> (Maybe String -> Int) -> IO Int
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
    Just String
n -> String -> Int
forall a. Read a => String -> a
read String
n
    Maybe String
Nothing -> Int
5
{-# NOINLINE maxSimultaneousPullDownloaders #-}

-- | The maximum number of push workers at a time. Each push worker reads from the database and uploads entities.
-- Share currently parallelizes on it's own in the backend, and any more than one push worker
-- just results in serialization conflicts which slow things down.
maxSimultaneousPushWorkers :: Int
maxSimultaneousPushWorkers :: Int
maxSimultaneousPushWorkers = IO Int -> Int
forall a. IO a -> a
unsafePerformIO (IO Int -> Int) -> IO Int -> Int
forall a b. (a -> b) -> a -> b
$ do
  String -> IO (Maybe String)
lookupEnv String
"UNISON_PUSH_WORKERS" IO (Maybe String) -> (Maybe String -> Int) -> IO Int
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
    Just String
n -> String -> Int
forall a. Read a => String -> a
read String
n
    Maybe String
Nothing -> Int
1
{-# NOINLINE maxSimultaneousPushWorkers #-}

syncChunkSize :: Int
syncChunkSize :: Int
syncChunkSize = IO Int -> Int
forall a. IO a -> a
unsafePerformIO (IO Int -> Int) -> IO Int -> Int
forall a b. (a -> b) -> a -> b
$ do
  String -> IO (Maybe String)
lookupEnv String
"UNISON_SYNC_CHUNK_SIZE" IO (Maybe String) -> (Maybe String -> Int) -> IO Int
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
    Just String
n -> String -> Int
forall a. Read a => String -> a
read String
n
    Maybe String
Nothing -> Int
50
{-# NOINLINE syncChunkSize #-}

------------------------------------------------------------------------------------------------------------------------
-- Pull

pull ::
  -- | The Unison Share URL.
  BaseUrl ->
  -- | The repo+path to pull from.
  Share.Path ->
  -- | Callback that's given a number of entities we just downloaded.
  (Int -> IO ()) ->
  Cli (Either (SyncError PullError) CausalHash)
pull :: BaseUrl
-> Path
-> (Int -> IO ())
-> Cli (Either (SyncError PullError) CausalHash)
pull BaseUrl
unisonShareUrl Path
repoPath Int -> IO ()
downloadedCallback =
  BaseUrl
-> Path
-> Cli
     (Either (SyncError GetCausalHashByPathError) (Maybe HashJWT))
getCausalHashByPath BaseUrl
unisonShareUrl Path
repoPath Cli (Either (SyncError GetCausalHashByPathError) (Maybe HashJWT))
-> (Either (SyncError GetCausalHashByPathError) (Maybe HashJWT)
    -> Cli (Either (SyncError PullError) CausalHash))
-> Cli (Either (SyncError PullError) CausalHash)
forall a b. Cli a -> (a -> Cli b) -> Cli b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Left SyncError GetCausalHashByPathError
err -> Either (SyncError PullError) CausalHash
-> Cli (Either (SyncError PullError) CausalHash)
forall a. a -> Cli a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SyncError PullError -> Either (SyncError PullError) CausalHash
forall a b. a -> Either a b
Left (GetCausalHashByPathError -> PullError
PullError'GetCausalHash (GetCausalHashByPathError -> PullError)
-> SyncError GetCausalHashByPathError -> SyncError PullError
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SyncError GetCausalHashByPathError
err))
    -- There's nothing at the remote path, so there's no causal to pull.
    Right Maybe HashJWT
Nothing -> Either (SyncError PullError) CausalHash
-> Cli (Either (SyncError PullError) CausalHash)
forall a. a -> Cli a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SyncError PullError -> Either (SyncError PullError) CausalHash
forall a b. a -> Either a b
Left (PullError -> SyncError PullError
forall e. e -> SyncError e
SyncError (Path -> PullError
PullError'NoHistoryAtPath Path
repoPath)))
    Right (Just HashJWT
hashJwt) ->
      BaseUrl
-> RepoInfo
-> HashJWT
-> (Int -> IO ())
-> Cli (Either (SyncError DownloadEntitiesError) ())
downloadEntities BaseUrl
unisonShareUrl (Path -> RepoInfo
Share.pathRepoInfo Path
repoPath) HashJWT
hashJwt Int -> IO ()
downloadedCallback Cli (Either (SyncError DownloadEntitiesError) ())
-> (Either (SyncError DownloadEntitiesError) ()
    -> Either (SyncError PullError) CausalHash)
-> Cli (Either (SyncError PullError) CausalHash)
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
        Left SyncError DownloadEntitiesError
err -> SyncError PullError -> Either (SyncError PullError) CausalHash
forall a b. a -> Either a b
Left (DownloadEntitiesError -> PullError
PullError'DownloadEntities (DownloadEntitiesError -> PullError)
-> SyncError DownloadEntitiesError -> SyncError PullError
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SyncError DownloadEntitiesError
err)
        Right () -> CausalHash -> Either (SyncError PullError) CausalHash
forall a b. b -> Either a b
Right (Hash32 -> CausalHash
hash32ToCausalHash (HashJWT -> Hash32
Share.hashJWTHash HashJWT
hashJwt))

------------------------------------------------------------------------------------------------------------------------
-- Download entities

downloadEntities ::
  -- | The Unison Share URL.
  BaseUrl ->
  -- | The repo to download from.
  Share.RepoInfo ->
  -- | The hash to download.
  Share.HashJWT ->
  -- | Callback that's given a number of entities we just downloaded.
  (Int -> IO ()) ->
  Cli (Either (SyncError Share.DownloadEntitiesError) ())
downloadEntities :: BaseUrl
-> RepoInfo
-> HashJWT
-> (Int -> IO ())
-> Cli (Either (SyncError DownloadEntitiesError) ())
downloadEntities BaseUrl
unisonShareUrl RepoInfo
repoInfo HashJWT
hashJwt Int -> IO ()
downloadedCallback = do
  Cli.Env {AuthenticatedHttpClient
authHTTPClient :: AuthenticatedHttpClient
$sel:authHTTPClient:Env :: Env -> AuthenticatedHttpClient
authHTTPClient, Codebase IO Symbol Ann
codebase :: Codebase IO Symbol Ann
$sel:codebase:Env :: Env -> Codebase IO Symbol Ann
codebase} <- Cli Env
forall r (m :: * -> *). MonadReader r m => m r
ask

  ((forall void.
  Either (SyncError DownloadEntitiesError) () -> Cli void)
 -> Cli (Either (SyncError DownloadEntitiesError) ()))
-> Cli (Either (SyncError DownloadEntitiesError) ())
forall a. ((forall void. a -> Cli void) -> Cli a) -> Cli a
Cli.label \forall void.
Either (SyncError DownloadEntitiesError) () -> Cli void
done -> do
    let failed :: SyncError Share.DownloadEntitiesError -> Cli void
        failed :: forall void. SyncError DownloadEntitiesError -> Cli void
failed = Either (SyncError DownloadEntitiesError) () -> Cli void
forall void.
Either (SyncError DownloadEntitiesError) () -> Cli void
done (Either (SyncError DownloadEntitiesError) () -> Cli void)
-> (SyncError DownloadEntitiesError
    -> Either (SyncError DownloadEntitiesError) ())
-> SyncError DownloadEntitiesError
-> Cli void
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError DownloadEntitiesError
-> Either (SyncError DownloadEntitiesError) ()
forall a b. a -> Either a b
Left

    let hash :: Hash32
hash = HashJWT -> Hash32
Share.hashJWTHash HashJWT
hashJwt

    Maybe (NESet Hash32)
maybeTempEntities <-
      Transaction (Maybe EntityLocation) -> Cli (Maybe EntityLocation)
forall a. Transaction a -> Cli a
Cli.runTransaction (Hash32 -> Transaction (Maybe EntityLocation)
Q.entityLocation Hash32
hash) Cli (Maybe EntityLocation)
-> (Maybe EntityLocation -> Cli (Maybe (NESet Hash32)))
-> Cli (Maybe (NESet Hash32))
forall a b. Cli a -> (a -> Cli b) -> Cli b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Just EntityLocation
Q.EntityInMainStorage -> Maybe (NESet Hash32) -> Cli (Maybe (NESet Hash32))
forall a. a -> Cli a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (NESet Hash32)
forall a. Maybe a
Nothing
        Just EntityLocation
Q.EntityInTempStorage -> Maybe (NESet Hash32) -> Cli (Maybe (NESet Hash32))
forall a. a -> Cli a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (NESet Hash32 -> Maybe (NESet Hash32)
forall a. a -> Maybe a
Just (Hash32 -> NESet Hash32
forall a. a -> NESet a
NESet.singleton Hash32
hash))
        Maybe EntityLocation
Nothing -> do
          let request :: IO (Either CodeserverTransportError DownloadEntitiesResponse)
request =
                AuthenticatedHttpClient
-> BaseUrl
-> DownloadEntitiesRequest
-> IO (Either CodeserverTransportError DownloadEntitiesResponse)
httpDownloadEntities
                  AuthenticatedHttpClient
authHTTPClient
                  BaseUrl
unisonShareUrl
                  Share.DownloadEntitiesRequest {RepoInfo
repoInfo :: RepoInfo
$sel:repoInfo:DownloadEntitiesRequest :: RepoInfo
repoInfo, $sel:hashes:DownloadEntitiesRequest :: NESet HashJWT
hashes = HashJWT -> NESet HashJWT
forall a. a -> NESet a
NESet.singleton HashJWT
hashJwt}
          NEMap Hash32 (Entity Text Hash32 HashJWT)
entities <-
            IO (Either CodeserverTransportError DownloadEntitiesResponse)
-> Cli (Either CodeserverTransportError DownloadEntitiesResponse)
forall a. IO a -> Cli a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Either CodeserverTransportError DownloadEntitiesResponse)
request Cli (Either CodeserverTransportError DownloadEntitiesResponse)
-> (Either CodeserverTransportError DownloadEntitiesResponse
    -> Cli (NEMap Hash32 (Entity Text Hash32 HashJWT)))
-> Cli (NEMap Hash32 (Entity Text Hash32 HashJWT))
forall a b. Cli a -> (a -> Cli b) -> Cli b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
              Left CodeserverTransportError
err -> SyncError DownloadEntitiesError
-> Cli (NEMap Hash32 (Entity Text Hash32 HashJWT))
forall void. SyncError DownloadEntitiesError -> Cli void
failed (CodeserverTransportError -> SyncError DownloadEntitiesError
forall e. CodeserverTransportError -> SyncError e
TransportError CodeserverTransportError
err)
              Right (Share.DownloadEntitiesFailure DownloadEntitiesError
err) -> SyncError DownloadEntitiesError
-> Cli (NEMap Hash32 (Entity Text Hash32 HashJWT))
forall void. SyncError DownloadEntitiesError -> Cli void
failed (DownloadEntitiesError -> SyncError DownloadEntitiesError
forall e. e -> SyncError e
SyncError DownloadEntitiesError
err)
              Right (Share.DownloadEntitiesSuccess NEMap Hash32 (Entity Text Hash32 HashJWT)
entities) -> NEMap Hash32 (Entity Text Hash32 HashJWT)
-> Cli (NEMap Hash32 (Entity Text Hash32 HashJWT))
forall a. a -> Cli a
forall (f :: * -> *) a. Applicative f => a -> f a
pure NEMap Hash32 (Entity Text Hash32 HashJWT)
entities
          case NEMap Hash32 (Entity Text Hash32 HashJWT)
-> Either EntityValidationError ()
validateEntities NEMap Hash32 (Entity Text Hash32 HashJWT)
entities of
            Left EntityValidationError
err -> SyncError DownloadEntitiesError -> Cli ()
forall void. SyncError DownloadEntitiesError -> Cli void
failed (SyncError DownloadEntitiesError -> Cli ())
-> (EntityValidationError -> SyncError DownloadEntitiesError)
-> EntityValidationError
-> Cli ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DownloadEntitiesError -> SyncError DownloadEntitiesError
forall e. e -> SyncError e
SyncError (DownloadEntitiesError -> SyncError DownloadEntitiesError)
-> (EntityValidationError -> DownloadEntitiesError)
-> EntityValidationError
-> SyncError DownloadEntitiesError
forall b c a. (b -> c) -> (a -> b) -> a -> c
. EntityValidationError -> DownloadEntitiesError
Share.DownloadEntitiesEntityValidationFailure (EntityValidationError -> Cli ())
-> EntityValidationError -> Cli ()
forall a b. (a -> b) -> a -> b
$ EntityValidationError
err
            Right () -> () -> Cli ()
forall a. a -> Cli a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
          Set Hash32
tempEntities <- Transaction (Set Hash32) -> Cli (Set Hash32)
forall a. Transaction a -> Cli a
Cli.runTransaction (NEMap Hash32 (Entity Text Hash32 HashJWT)
-> Transaction (Set Hash32)
insertEntities NEMap Hash32 (Entity Text Hash32 HashJWT)
entities)
          IO () -> Cli ()
forall a. IO a -> Cli a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Int -> IO ()
downloadedCallback Int
1)
          pure (Set Hash32 -> Maybe (NESet Hash32)
forall a. Set a -> Maybe (NESet a)
NESet.nonEmptySet Set Hash32
tempEntities)

    Maybe (NESet Hash32) -> (NESet Hash32 -> Cli ()) -> Cli ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
whenJust Maybe (NESet Hash32)
maybeTempEntities \NESet Hash32
tempEntities -> do
      let doCompleteTempEntities :: IO (Either (SyncError DownloadEntitiesError) ())
doCompleteTempEntities =
            AuthenticatedHttpClient
-> BaseUrl
-> (forall a. ((forall x. Transaction x -> IO x) -> IO a) -> IO a)
-> RepoInfo
-> (Int -> IO ())
-> NESet Hash32
-> IO (Either (SyncError DownloadEntitiesError) ())
completeTempEntities
              AuthenticatedHttpClient
authHTTPClient
              BaseUrl
unisonShareUrl
              ( \(forall x. Transaction x -> IO x) -> IO a
action ->
                  Codebase IO Symbol Ann -> forall x. (Connection -> IO x) -> IO x
forall (m :: * -> *) v a.
Codebase m v a -> forall x. (Connection -> m x) -> m x
Codebase.withConnection Codebase IO Symbol Ann
codebase \Connection
conn ->
                    (forall x. Transaction x -> IO x) -> IO a
action (Connection -> Transaction x -> IO x
forall (m :: * -> *) a.
(MonadIO m, HasCallStack) =>
Connection -> Transaction a -> m a
Sqlite.runTransaction Connection
conn)
              )
              RepoInfo
repoInfo
              Int -> IO ()
downloadedCallback
              NESet Hash32
tempEntities
      IO (Either (SyncError DownloadEntitiesError) ())
-> Cli (Either (SyncError DownloadEntitiesError) ())
forall a. IO a -> Cli a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Either (SyncError DownloadEntitiesError) ())
doCompleteTempEntities Cli (Either (SyncError DownloadEntitiesError) ())
-> (Cli (Either (SyncError DownloadEntitiesError) ()) -> Cli ())
-> Cli ()
forall a b. a -> (a -> b) -> b
& (SyncError DownloadEntitiesError -> Cli ())
-> Cli (Either (SyncError DownloadEntitiesError) ()) -> Cli ()
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> m (Either a b) -> m b
onLeftM \SyncError DownloadEntitiesError
err ->
        SyncError DownloadEntitiesError -> Cli ()
forall void. SyncError DownloadEntitiesError -> Cli void
failed SyncError DownloadEntitiesError
err
    -- Since we may have just inserted and then deleted many temp entities, we attempt to recover some disk space by
    -- vacuuming after each pull. If the vacuum fails due to another open transaction on this connection, that's ok,
    -- we'll try vacuuming again next pull.
    Bool
_success <- IO Bool -> Cli Bool
forall a. IO a -> Cli a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Codebase IO Symbol Ann -> forall x. (Connection -> IO x) -> IO x
forall (m :: * -> *) v a.
Codebase m v a -> forall x. (Connection -> m x) -> m x
Codebase.withConnection Codebase IO Symbol Ann
codebase Connection -> IO Bool
Sqlite.vacuum)
    pure (() -> Either (SyncError DownloadEntitiesError) ()
forall a b. b -> Either a b
Right ())

-- | Validates the provided entities if and only if the environment variable `UNISON_ENTITY_VALIDATION` is set to "true".
validateEntities :: NEMap Hash32 (Share.Entity Text Hash32 Share.HashJWT) -> Either Share.EntityValidationError ()
validateEntities :: NEMap Hash32 (Entity Text Hash32 HashJWT)
-> Either EntityValidationError ()
validateEntities NEMap Hash32 (Entity Text Hash32 HashJWT)
entities =
  Bool
-> Either EntityValidationError ()
-> Either EntityValidationError ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
shouldValidateEntities (Either EntityValidationError ()
 -> Either EntityValidationError ())
-> Either EntityValidationError ()
-> Either EntityValidationError ()
forall a b. (a -> b) -> a -> b
$ do
    Map Hash32 (Entity Text Hash32 HashJWT)
-> (Hash32
    -> Entity Text Hash32 HashJWT -> Either EntityValidationError ())
-> Either EntityValidationError ()
forall i (t :: * -> *) (f :: * -> *) a b.
(FoldableWithIndex i t, Applicative f) =>
t a -> (i -> a -> f b) -> f ()
ifor_ (NEMap Hash32 (Entity Text Hash32 HashJWT)
-> Map Hash32 (Entity Text Hash32 HashJWT)
forall k a. NEMap k a -> Map k a
NEMap.toMap NEMap Hash32 (Entity Text Hash32 HashJWT)
entities) \Hash32
hash Entity Text Hash32 HashJWT
entity -> do
      let entityWithHashes :: Entity Text Hash32 Hash32
entityWithHashes = Entity Text Hash32 HashJWT
entity Entity Text Hash32 HashJWT
-> (Entity Text Hash32 HashJWT -> Entity Text Hash32 Hash32)
-> Entity Text Hash32 Hash32
forall a b. a -> (a -> b) -> b
& (HashJWT -> Identity Hash32)
-> Entity Text Hash32 HashJWT
-> Identity (Entity Text Hash32 Hash32)
forall (m :: * -> *) hash' hash text noSyncHash.
(Applicative m, Ord hash') =>
(hash -> m hash')
-> Entity text noSyncHash hash -> m (Entity text noSyncHash hash')
Share.entityHashes_ ((HashJWT -> Identity Hash32)
 -> Entity Text Hash32 HashJWT
 -> Identity (Entity Text Hash32 Hash32))
-> (HashJWT -> Hash32)
-> Entity Text Hash32 HashJWT
-> Entity Text Hash32 Hash32
forall s t a b. ASetter s t a b -> (a -> b) -> s -> t
%~ HashJWT -> Hash32
Share.hashJWTHash
      case Hash32 -> Entity Text Hash32 Hash32 -> Maybe EntityValidationError
EV.validateEntity Hash32
hash Entity Text Hash32 Hash32
entityWithHashes of
        Maybe EntityValidationError
Nothing -> () -> Either EntityValidationError ()
forall a. a -> Either EntityValidationError a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        Just err :: EntityValidationError
err@(Share.EntityHashMismatch EntityType
et (Share.HashMismatchForEntity {Hash32
supplied :: Hash32
$sel:supplied:HashMismatchForEntity :: HashMismatchForEntity -> Hash32
supplied, Hash32
computed :: Hash32
$sel:computed:HashMismatchForEntity :: HashMismatchForEntity -> Hash32
computed})) ->
          let expectedMismatches :: Map Hash32 Hash32
expectedMismatches = case EntityType
et of
                EntityType
Share.TermComponentType -> Map Hash32 Hash32
expectedComponentHashMismatches
                EntityType
Share.DeclComponentType -> Map Hash32 Hash32
expectedComponentHashMismatches
                EntityType
Share.CausalType -> Map Hash32 Hash32
expectedCausalHashMismatches
                EntityType
_ -> Map Hash32 Hash32
forall a. Monoid a => a
mempty
           in case Hash32 -> Map Hash32 Hash32 -> Maybe Hash32
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Hash32
supplied Map Hash32 Hash32
expectedMismatches of
                Just Hash32
expected
                  | Hash32
expected Hash32 -> Hash32 -> Bool
forall a. Eq a => a -> a -> Bool
== Hash32
computed -> () -> Either EntityValidationError ()
forall a. a -> Either EntityValidationError a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                Maybe Hash32
_ -> do
                  EntityValidationError -> Either EntityValidationError ()
forall a b. a -> Either a b
Left EntityValidationError
err
        Just EntityValidationError
err -> do
          EntityValidationError -> Either EntityValidationError ()
forall a b. a -> Either a b
Left EntityValidationError
err

-- | Validate entities received from the server unless this flag is set to false.
validationEnvKey :: String
validationEnvKey :: String
validationEnvKey = String
"UNISON_ENTITY_VALIDATION"

shouldValidateEntities :: Bool
shouldValidateEntities :: Bool
shouldValidateEntities = IO Bool -> Bool
forall a. IO a -> a
unsafePerformIO (IO Bool -> Bool) -> IO Bool -> Bool
forall a b. (a -> b) -> a -> b
$ do
  String -> IO (Maybe String)
lookupEnv String
validationEnvKey IO (Maybe String) -> (Maybe String -> Bool) -> IO Bool
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
    Just String
"false" -> Bool
False
    Maybe String
_ -> Bool
True
{-# NOINLINE shouldValidateEntities #-}

type WorkerCount =
  TVar Int

newWorkerCount :: IO WorkerCount
newWorkerCount :: IO WorkerCount
newWorkerCount =
  Int -> IO WorkerCount
forall a. a -> IO (TVar a)
newTVarIO Int
0

recordWorking :: WorkerCount -> STM ()
recordWorking :: WorkerCount -> STM ()
recordWorking WorkerCount
sem =
  WorkerCount -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' WorkerCount
sem (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)

recordNotWorking :: WorkerCount -> STM ()
recordNotWorking :: WorkerCount -> STM ()
recordNotWorking WorkerCount
sem =
  WorkerCount -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' WorkerCount
sem \Int
n -> Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1

-- What the dispatcher is to do
data DispatcherJob
  = DispatcherForkWorker (NESet Share.HashJWT)
  | DispatcherReturnEarlyBecauseDownloaderFailed (SyncError Share.DownloadEntitiesError)
  | DispatcherDone

-- | Finish downloading entities from Unison Share (or return the first failure to download something).
--
-- Precondition: the entities were *already* downloaded at some point in the past, and are now sitting in the
-- `temp_entity` table, waiting for their dependencies to arrive so they can be flushed to main storage.
completeTempEntities ::
  AuthenticatedHttpClient ->
  BaseUrl ->
  (forall a. ((forall x. Sqlite.Transaction x -> IO x) -> IO a) -> IO a) ->
  Share.RepoInfo ->
  (Int -> IO ()) ->
  NESet Hash32 ->
  IO (Either (SyncError Share.DownloadEntitiesError) ())
completeTempEntities :: AuthenticatedHttpClient
-> BaseUrl
-> (forall a. ((forall x. Transaction x -> IO x) -> IO a) -> IO a)
-> RepoInfo
-> (Int -> IO ())
-> NESet Hash32
-> IO (Either (SyncError DownloadEntitiesError) ())
completeTempEntities AuthenticatedHttpClient
httpClient BaseUrl
unisonShareUrl forall a. ((forall x. Transaction x -> IO x) -> IO a) -> IO a
connect RepoInfo
repoInfo Int -> IO ()
downloadedCallback NESet Hash32
initialNewTempEntities = do
  -- The set of hashes we still need to download
  TVar (Set HashJWT)
hashesVar <- Set HashJWT -> IO (TVar (Set HashJWT))
forall a. a -> IO (TVar a)
newTVarIO Set HashJWT
forall a. Set a
Set.empty

  -- The set of hashes that we haven't inserted yet, but will soon, because we've committed to downloading them.
  TVar (Set HashJWT)
uninsertedHashesVar <- Set HashJWT -> IO (TVar (Set HashJWT))
forall a. a -> IO (TVar a)
newTVarIO Set HashJWT
forall a. Set a
Set.empty

  -- The entities payloads (along with the jwts that we used to download them) that we've downloaded
  TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
entitiesQueue <- IO
  (TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT)))
forall a. IO (TQueue a)
newTQueueIO

  -- The sets of new (at the time of inserting, anyway) temp entity rows, which we need to elaborate, then download.
  TQueue (Set HashJWT, Maybe (NESet Hash32))
newTempEntitiesQueue <- IO (TQueue (Set HashJWT, Maybe (NESet Hash32)))
forall a. IO (TQueue a)
newTQueueIO

  -- How many workers (downloader / inserter / elaborator) are currently doing stuff.
  WorkerCount
workerCount <- IO WorkerCount
newWorkerCount

  -- The first download error seen by a downloader, if any.
  TMVar (SyncError DownloadEntitiesError)
downloaderFailedVar <- IO (TMVar (SyncError DownloadEntitiesError))
forall a. IO (TMVar a)
newEmptyTMVarIO

  -- Kick off the cycle of inserter->elaborator->dispatcher->downloader by giving the elaborator something to do
  STM () -> IO ()
forall a. STM a -> IO a
atomically (TQueue (Set HashJWT, Maybe (NESet Hash32))
-> (Set HashJWT, Maybe (NESet Hash32)) -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Set HashJWT, Maybe (NESet Hash32))
newTempEntitiesQueue (Set HashJWT
forall a. Set a
Set.empty, NESet Hash32 -> Maybe (NESet Hash32)
forall a. a -> Maybe a
Just NESet Hash32
initialNewTempEntities))

  (Scope -> IO (Either (SyncError DownloadEntitiesError) ()))
-> IO (Either (SyncError DownloadEntitiesError) ())
forall a. (Scope -> IO a) -> IO a
Ki.scoped \Scope
scope -> do
    Scope -> IO Void -> IO ()
Ki.fork_ Scope
scope (TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
-> TQueue (Set HashJWT, Maybe (NESet Hash32))
-> WorkerCount
-> IO Void
inserter TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
entitiesQueue TQueue (Set HashJWT, Maybe (NESet Hash32))
newTempEntitiesQueue WorkerCount
workerCount)
    Scope -> IO Void -> IO ()
Ki.fork_ Scope
scope (TVar (Set HashJWT)
-> TVar (Set HashJWT)
-> TQueue (Set HashJWT, Maybe (NESet Hash32))
-> WorkerCount
-> IO Void
elaborator TVar (Set HashJWT)
hashesVar TVar (Set HashJWT)
uninsertedHashesVar TQueue (Set HashJWT, Maybe (NESet Hash32))
newTempEntitiesQueue WorkerCount
workerCount)
    TVar (Set HashJWT)
-> TVar (Set HashJWT)
-> TQueue
     (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
-> TQueue (Set HashJWT, Maybe (NESet Hash32))
-> WorkerCount
-> TMVar (SyncError DownloadEntitiesError)
-> IO (Either (SyncError DownloadEntitiesError) ())
dispatcher TVar (Set HashJWT)
hashesVar TVar (Set HashJWT)
uninsertedHashesVar TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
entitiesQueue TQueue (Set HashJWT, Maybe (NESet Hash32))
newTempEntitiesQueue WorkerCount
workerCount TMVar (SyncError DownloadEntitiesError)
downloaderFailedVar
  where
    -- Dispatcher thread: "dequeue" from `hashesVar`, fork one-shot downloaders.
    --
    -- We stop when either all of the following are true:
    --
    --   - There are no outstanding workers (downloaders, inserter, elaboraror)
    --   - The inserter thread doesn't have any outstanding work enqueued (in `entitiesQueue`)
    --   - The elaborator thread doesn't have any outstanding work enqueued (in `newTempEntitiesQueue`)
    --
    -- Or:
    --
    --   - Some downloader failed to download something
    dispatcher ::
      TVar (Set Share.HashJWT) ->
      TVar (Set Share.HashJWT) ->
      TQueue (NESet Share.HashJWT, NEMap Hash32 (Share.Entity Text Hash32 Share.HashJWT)) ->
      TQueue (Set Share.HashJWT, Maybe (NESet Hash32)) ->
      WorkerCount ->
      TMVar (SyncError Share.DownloadEntitiesError) ->
      IO (Either (SyncError Share.DownloadEntitiesError) ())
    dispatcher :: TVar (Set HashJWT)
-> TVar (Set HashJWT)
-> TQueue
     (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
-> TQueue (Set HashJWT, Maybe (NESet Hash32))
-> WorkerCount
-> TMVar (SyncError DownloadEntitiesError)
-> IO (Either (SyncError DownloadEntitiesError) ())
dispatcher TVar (Set HashJWT)
hashesVar TVar (Set HashJWT)
uninsertedHashesVar TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
entitiesQueue TQueue (Set HashJWT, Maybe (NESet Hash32))
newTempEntitiesQueue WorkerCount
workerCount TMVar (SyncError DownloadEntitiesError)
downloaderFailedVar =
      (Scope -> IO (Either (SyncError DownloadEntitiesError) ()))
-> IO (Either (SyncError DownloadEntitiesError) ())
forall a. (Scope -> IO a) -> IO a
Ki.scoped \Scope
scope ->
        let loop :: IO (Either (SyncError Share.DownloadEntitiesError) ())
            loop :: IO (Either (SyncError DownloadEntitiesError) ())
loop =
              STM DispatcherJob -> IO DispatcherJob
forall a. STM a -> IO a
atomically (STM DispatcherJob
checkIfDownloaderFailedMode STM DispatcherJob -> STM DispatcherJob -> STM DispatcherJob
forall a. STM a -> STM a -> STM a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> STM DispatcherJob
dispatchWorkMode STM DispatcherJob -> STM DispatcherJob -> STM DispatcherJob
forall a. STM a -> STM a -> STM a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> STM DispatcherJob
checkIfDoneMode) IO DispatcherJob
-> (DispatcherJob
    -> IO (Either (SyncError DownloadEntitiesError) ()))
-> IO (Either (SyncError DownloadEntitiesError) ())
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                DispatcherJob
DispatcherDone -> Either (SyncError DownloadEntitiesError) ()
-> IO (Either (SyncError DownloadEntitiesError) ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> Either (SyncError DownloadEntitiesError) ()
forall a b. b -> Either a b
Right ())
                DispatcherReturnEarlyBecauseDownloaderFailed SyncError DownloadEntitiesError
err -> Either (SyncError DownloadEntitiesError) ()
-> IO (Either (SyncError DownloadEntitiesError) ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SyncError DownloadEntitiesError
-> Either (SyncError DownloadEntitiesError) ()
forall a b. a -> Either a b
Left SyncError DownloadEntitiesError
err)
                DispatcherForkWorker NESet HashJWT
hashes -> do
                  STM () -> IO ()
forall a. STM a -> IO a
atomically do
                    -- Limit number of simultaneous downloaders (plus 2, for inserter and elaborator)
                    Int
workers <- WorkerCount -> STM Int
forall a. TVar a -> STM a
readTVar WorkerCount
workerCount
                    Bool -> STM ()
check (Int
workers Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
maxSimultaneousPullDownloaders Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
2)
                    -- we do need to record the downloader as working outside of the worker thread, not inside.
                    -- otherwise, we might erroneously fall through the teardown logic below and conclude there's
                    -- nothing more for the dispatcher to do, when in fact a downloader thread just hasn't made it as
                    -- far as recording its own existence
                    WorkerCount -> STM ()
recordWorking WorkerCount
workerCount
                  Thread ()
_ <-
                    forall a. Scope -> IO a -> IO (Thread a)
Ki.fork @() Scope
scope do
                      TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
-> WorkerCount
-> NESet HashJWT
-> IO (Either (SyncError DownloadEntitiesError) ())
downloader TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
entitiesQueue WorkerCount
workerCount NESet HashJWT
hashes IO (Either (SyncError DownloadEntitiesError) ())
-> (IO (Either (SyncError DownloadEntitiesError) ()) -> IO ())
-> IO ()
forall a b. a -> (a -> b) -> b
& (SyncError DownloadEntitiesError -> IO ())
-> IO (Either (SyncError DownloadEntitiesError) ()) -> IO ()
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> m (Either a b) -> m b
onLeftM \SyncError DownloadEntitiesError
err ->
                        IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (TMVar (SyncError DownloadEntitiesError)
-> SyncError DownloadEntitiesError -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar (SyncError DownloadEntitiesError)
downloaderFailedVar SyncError DownloadEntitiesError
err))
                  IO (Either (SyncError DownloadEntitiesError) ())
loop
         in IO (Either (SyncError DownloadEntitiesError) ())
loop
      where
        checkIfDownloaderFailedMode :: STM DispatcherJob
        checkIfDownloaderFailedMode :: STM DispatcherJob
checkIfDownloaderFailedMode =
          SyncError DownloadEntitiesError -> DispatcherJob
DispatcherReturnEarlyBecauseDownloaderFailed (SyncError DownloadEntitiesError -> DispatcherJob)
-> STM (SyncError DownloadEntitiesError) -> STM DispatcherJob
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMVar (SyncError DownloadEntitiesError)
-> STM (SyncError DownloadEntitiesError)
forall a. TMVar a -> STM a
readTMVar TMVar (SyncError DownloadEntitiesError)
downloaderFailedVar

        dispatchWorkMode :: STM DispatcherJob
        dispatchWorkMode :: STM DispatcherJob
dispatchWorkMode = do
          Set HashJWT
hashes <- TVar (Set HashJWT) -> STM (Set HashJWT)
forall a. TVar a -> STM a
readTVar TVar (Set HashJWT)
hashesVar
          Bool -> STM ()
check (Bool -> Bool
not (Set HashJWT -> Bool
forall a. Set a -> Bool
Set.null Set HashJWT
hashes))
          let (Set HashJWT
hashes1, Set HashJWT
hashes2) = Int -> Set HashJWT -> (Set HashJWT, Set HashJWT)
forall a. Int -> Set a -> (Set a, Set a)
Set.splitAt Int
syncChunkSize Set HashJWT
hashes
          TVar (Set HashJWT) -> (Set HashJWT -> Set HashJWT) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Set HashJWT)
uninsertedHashesVar (Set HashJWT -> Set HashJWT -> Set HashJWT
forall a. Ord a => Set a -> Set a -> Set a
Set.union Set HashJWT
hashes1)
          TVar (Set HashJWT) -> Set HashJWT -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Set HashJWT)
hashesVar Set HashJWT
hashes2
          pure (NESet HashJWT -> DispatcherJob
DispatcherForkWorker (Set HashJWT -> NESet HashJWT
forall a. Set a -> NESet a
NESet.unsafeFromSet Set HashJWT
hashes1))

        -- Check to see if there are no hashes left to download, no outstanding workers, and no work in either queue
        checkIfDoneMode :: STM DispatcherJob
        checkIfDoneMode :: STM DispatcherJob
checkIfDoneMode = do
          Int
workers <- WorkerCount -> STM Int
forall a. TVar a -> STM a
readTVar WorkerCount
workerCount
          Bool -> STM ()
check (Int
workers Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0)
          TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
-> STM Bool
forall a. TQueue a -> STM Bool
isEmptyTQueue TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
entitiesQueue STM Bool -> (Bool -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> STM ()
check
          TQueue (Set HashJWT, Maybe (NESet Hash32)) -> STM Bool
forall a. TQueue a -> STM Bool
isEmptyTQueue TQueue (Set HashJWT, Maybe (NESet Hash32))
newTempEntitiesQueue STM Bool -> (Bool -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> STM ()
check
          pure DispatcherJob
DispatcherDone

    -- Downloader thread: download entities, (if successful) enqueue to `entitiesQueue`
    downloader ::
      TQueue (NESet Share.HashJWT, NEMap Hash32 (Share.Entity Text Hash32 Share.HashJWT)) ->
      WorkerCount ->
      NESet Share.HashJWT ->
      IO (Either (SyncError Share.DownloadEntitiesError) ())
    downloader :: TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
-> WorkerCount
-> NESet HashJWT
-> IO (Either (SyncError DownloadEntitiesError) ())
downloader TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
entitiesQueue WorkerCount
workerCount NESet HashJWT
hashes = do
      AuthenticatedHttpClient
-> BaseUrl
-> DownloadEntitiesRequest
-> IO (Either CodeserverTransportError DownloadEntitiesResponse)
httpDownloadEntities AuthenticatedHttpClient
httpClient BaseUrl
unisonShareUrl Share.DownloadEntitiesRequest {RepoInfo
$sel:repoInfo:DownloadEntitiesRequest :: RepoInfo
repoInfo :: RepoInfo
repoInfo, NESet HashJWT
$sel:hashes:DownloadEntitiesRequest :: NESet HashJWT
hashes :: NESet HashJWT
hashes} IO (Either CodeserverTransportError DownloadEntitiesResponse)
-> (Either CodeserverTransportError DownloadEntitiesResponse
    -> IO (Either (SyncError DownloadEntitiesError) ()))
-> IO (Either (SyncError DownloadEntitiesError) ())
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Left CodeserverTransportError
err -> do
          STM () -> IO ()
forall a. STM a -> IO a
atomically (WorkerCount -> STM ()
recordNotWorking WorkerCount
workerCount)
          pure (SyncError DownloadEntitiesError
-> Either (SyncError DownloadEntitiesError) ()
forall a b. a -> Either a b
Left (CodeserverTransportError -> SyncError DownloadEntitiesError
forall e. CodeserverTransportError -> SyncError e
TransportError CodeserverTransportError
err))
        Right (Share.DownloadEntitiesFailure DownloadEntitiesError
err) -> do
          STM () -> IO ()
forall a. STM a -> IO a
atomically (WorkerCount -> STM ()
recordNotWorking WorkerCount
workerCount)
          pure (SyncError DownloadEntitiesError
-> Either (SyncError DownloadEntitiesError) ()
forall a b. a -> Either a b
Left (DownloadEntitiesError -> SyncError DownloadEntitiesError
forall e. e -> SyncError e
SyncError DownloadEntitiesError
err))
        Right (Share.DownloadEntitiesSuccess NEMap Hash32 (Entity Text Hash32 HashJWT)
entities) -> do
          Int -> IO ()
downloadedCallback (NESet HashJWT -> Int
forall a. NESet a -> Int
NESet.size NESet HashJWT
hashes)
          case NEMap Hash32 (Entity Text Hash32 HashJWT)
-> Either EntityValidationError ()
validateEntities NEMap Hash32 (Entity Text Hash32 HashJWT)
entities of
            Left EntityValidationError
err -> Either (SyncError DownloadEntitiesError) ()
-> IO (Either (SyncError DownloadEntitiesError) ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either (SyncError DownloadEntitiesError) ()
 -> IO (Either (SyncError DownloadEntitiesError) ()))
-> (EntityValidationError
    -> Either (SyncError DownloadEntitiesError) ())
-> EntityValidationError
-> IO (Either (SyncError DownloadEntitiesError) ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError DownloadEntitiesError
-> Either (SyncError DownloadEntitiesError) ()
forall a b. a -> Either a b
Left (SyncError DownloadEntitiesError
 -> Either (SyncError DownloadEntitiesError) ())
-> (EntityValidationError -> SyncError DownloadEntitiesError)
-> EntityValidationError
-> Either (SyncError DownloadEntitiesError) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DownloadEntitiesError -> SyncError DownloadEntitiesError
forall e. e -> SyncError e
SyncError (DownloadEntitiesError -> SyncError DownloadEntitiesError)
-> (EntityValidationError -> DownloadEntitiesError)
-> EntityValidationError
-> SyncError DownloadEntitiesError
forall b c a. (b -> c) -> (a -> b) -> a -> c
. EntityValidationError -> DownloadEntitiesError
Share.DownloadEntitiesEntityValidationFailure (EntityValidationError
 -> IO (Either (SyncError DownloadEntitiesError) ()))
-> EntityValidationError
-> IO (Either (SyncError DownloadEntitiesError) ())
forall a b. (a -> b) -> a -> b
$ EntityValidationError
err
            Right () -> do
              STM () -> IO ()
forall a. STM a -> IO a
atomically do
                TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
-> (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
-> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
entitiesQueue (NESet HashJWT
hashes, NEMap Hash32 (Entity Text Hash32 HashJWT)
entities)
                WorkerCount -> STM ()
recordNotWorking WorkerCount
workerCount
              pure (() -> Either (SyncError DownloadEntitiesError) ()
forall a b. b -> Either a b
Right ())

    -- Inserter thread: dequeue from `entitiesQueue`, insert entities, enqueue to `newTempEntitiesQueue`
    inserter ::
      TQueue (NESet Share.HashJWT, NEMap Hash32 (Share.Entity Text Hash32 Share.HashJWT)) ->
      TQueue (Set Share.HashJWT, Maybe (NESet Hash32)) ->
      WorkerCount ->
      IO Void
    inserter :: TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
-> TQueue (Set HashJWT, Maybe (NESet Hash32))
-> WorkerCount
-> IO Void
inserter TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
entitiesQueue TQueue (Set HashJWT, Maybe (NESet Hash32))
newTempEntitiesQueue WorkerCount
workerCount =
      ((forall x. Transaction x -> IO x) -> IO Void) -> IO Void
forall a. ((forall x. Transaction x -> IO x) -> IO a) -> IO a
connect \forall x. Transaction x -> IO x
runTransaction ->
        IO () -> IO Void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever do
          (NESet HashJWT
hashJwts, NEMap Hash32 (Entity Text Hash32 HashJWT)
entities) <-
            STM (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
-> IO (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
forall a. STM a -> IO a
atomically do
              (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
entities <- TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
-> STM (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
forall a. TQueue a -> STM a
readTQueue TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
entitiesQueue
              WorkerCount -> STM ()
recordWorking WorkerCount
workerCount
              pure (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
entities
          Set Hash32
newTempEntities0 <-
            Transaction (Set Hash32) -> IO (Set Hash32)
forall x. Transaction x -> IO x
runTransaction do
              NEMap Hash32 (Entity Text Hash32 HashJWT)
-> NonEmpty (Hash32, Entity Text Hash32 HashJWT)
forall k a. NEMap k a -> NonEmpty (k, a)
NEMap.toList NEMap Hash32 (Entity Text Hash32 HashJWT)
entities NonEmpty (Hash32, Entity Text Hash32 HashJWT)
-> (NonEmpty (Hash32, Entity Text Hash32 HashJWT)
    -> Transaction (Set Hash32))
-> Transaction (Set Hash32)
forall a b. a -> (a -> b) -> b
& ((Hash32, Entity Text Hash32 HashJWT) -> Transaction (Set Hash32))
-> NonEmpty (Hash32, Entity Text Hash32 HashJWT)
-> Transaction (Set Hash32)
forall (m :: * -> *) (f :: * -> *) b a.
(Monad m, Foldable f, Monoid b) =>
(a -> m b) -> f a -> m b
foldMapM \(Hash32
hash, Entity Text Hash32 HashJWT
entity) ->
                Hash32 -> Entity Text Hash32 HashJWT -> Transaction EntityLocation
upsertEntitySomewhere Hash32
hash Entity Text Hash32 HashJWT
entity Transaction EntityLocation
-> (EntityLocation -> Set Hash32) -> Transaction (Set Hash32)
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
                  EntityLocation
Q.EntityInMainStorage -> Set Hash32
forall a. Set a
Set.empty
                  EntityLocation
Q.EntityInTempStorage -> Hash32 -> Set Hash32
forall a. a -> Set a
Set.singleton Hash32
hash
          STM () -> IO ()
forall a. STM a -> IO a
atomically do
            TQueue (Set HashJWT, Maybe (NESet Hash32))
-> (Set HashJWT, Maybe (NESet Hash32)) -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Set HashJWT, Maybe (NESet Hash32))
newTempEntitiesQueue (NESet HashJWT -> Set HashJWT
forall a. NESet a -> Set a
NESet.toSet NESet HashJWT
hashJwts, Set Hash32 -> Maybe (NESet Hash32)
forall a. Set a -> Maybe (NESet a)
NESet.nonEmptySet Set Hash32
newTempEntities0)
            WorkerCount -> STM ()
recordNotWorking WorkerCount
workerCount

    -- Elaborator thread: dequeue from `newTempEntitiesQueue`, elaborate, "enqueue" to `hashesVar`
    elaborator ::
      TVar (Set Share.HashJWT) ->
      TVar (Set Share.HashJWT) ->
      TQueue (Set Share.HashJWT, Maybe (NESet Hash32)) ->
      WorkerCount ->
      IO Void
    elaborator :: TVar (Set HashJWT)
-> TVar (Set HashJWT)
-> TQueue (Set HashJWT, Maybe (NESet Hash32))
-> WorkerCount
-> IO Void
elaborator TVar (Set HashJWT)
hashesVar TVar (Set HashJWT)
uninsertedHashesVar TQueue (Set HashJWT, Maybe (NESet Hash32))
newTempEntitiesQueue WorkerCount
workerCount =
      ((forall x. Transaction x -> IO x) -> IO Void) -> IO Void
forall a. ((forall x. Transaction x -> IO x) -> IO a) -> IO a
connect \forall x. Transaction x -> IO x
runTransaction ->
        IO () -> IO Void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever do
          Maybe (NESet Hash32)
maybeNewTempEntities <-
            STM (Maybe (NESet Hash32)) -> IO (Maybe (NESet Hash32))
forall a. STM a -> IO a
atomically do
              (Set HashJWT
hashJwts, Maybe (NESet Hash32)
mayNewTempEntities) <- TQueue (Set HashJWT, Maybe (NESet Hash32))
-> STM (Set HashJWT, Maybe (NESet Hash32))
forall a. TQueue a -> STM a
readTQueue TQueue (Set HashJWT, Maybe (NESet Hash32))
newTempEntitiesQueue
              -- Avoid unnecessary retaining of these hashes to keep memory usage more stable. This algorithm would
              -- still be correct if we never delete from `uninsertedHashes`.
              --
              -- We remove the inserted hashes from uninsertedHashesVar at this point rather than right after insertion
              -- in order to ensure that no running transaction of the elaborator is viewing a snapshot that precedes
              -- the snapshot that inserted those hashes.
              TVar (Set HashJWT) -> (Set HashJWT -> Set HashJWT) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Set HashJWT)
uninsertedHashesVar \Set HashJWT
uninsertedHashes -> Set HashJWT -> Set HashJWT -> Set HashJWT
forall a. Ord a => Set a -> Set a -> Set a
Set.difference Set HashJWT
uninsertedHashes Set HashJWT
hashJwts
              case Maybe (NESet Hash32)
mayNewTempEntities of
                Maybe (NESet Hash32)
Nothing -> Maybe (NESet Hash32) -> STM (Maybe (NESet Hash32))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (NESet Hash32)
forall a. Maybe a
Nothing
                Just NESet Hash32
newTempEntities -> do
                  WorkerCount -> STM ()
recordWorking WorkerCount
workerCount
                  pure (NESet Hash32 -> Maybe (NESet Hash32)
forall a. a -> Maybe a
Just NESet Hash32
newTempEntities)
          Maybe (NESet Hash32) -> (NESet Hash32 -> IO ()) -> IO ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
whenJust Maybe (NESet Hash32)
maybeNewTempEntities \NESet Hash32
newTempEntities -> do
            Set HashJWT
newElaboratedHashes <- Transaction (Set HashJWT) -> IO (Set HashJWT)
forall x. Transaction x -> IO x
runTransaction (NESet Hash32 -> Transaction (Set HashJWT)
elaborateHashes NESet Hash32
newTempEntities)
            STM () -> IO ()
forall a. STM a -> IO a
atomically do
              Set HashJWT
uninsertedHashes <- TVar (Set HashJWT) -> STM (Set HashJWT)
forall a. TVar a -> STM a
readTVar TVar (Set HashJWT)
uninsertedHashesVar
              Set HashJWT
hashes0 <- TVar (Set HashJWT) -> STM (Set HashJWT)
forall a. TVar a -> STM a
readTVar TVar (Set HashJWT)
hashesVar
              TVar (Set HashJWT) -> Set HashJWT -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Set HashJWT)
hashesVar (Set HashJWT -> STM ()) -> Set HashJWT -> STM ()
forall a b. (a -> b) -> a -> b
$! Set HashJWT -> Set HashJWT -> Set HashJWT
forall a. Ord a => Set a -> Set a -> Set a
Set.union (Set HashJWT -> Set HashJWT -> Set HashJWT
forall a. Ord a => Set a -> Set a -> Set a
Set.difference Set HashJWT
newElaboratedHashes Set HashJWT
uninsertedHashes) Set HashJWT
hashes0
              WorkerCount -> STM ()
recordNotWorking WorkerCount
workerCount

-- | Insert entities into the database, and return the subset that went into temp storage (`temp_entitiy`) rather than
-- of main storage (`object` / `causal`) due to missing dependencies.
insertEntities :: NEMap Hash32 (Share.Entity Text Hash32 Share.HashJWT) -> Sqlite.Transaction (Set Hash32)
insertEntities :: NEMap Hash32 (Entity Text Hash32 HashJWT)
-> Transaction (Set Hash32)
insertEntities NEMap Hash32 (Entity Text Hash32 HashJWT)
entities =
  NEMap Hash32 (Entity Text Hash32 HashJWT)
-> NonEmpty (Hash32, Entity Text Hash32 HashJWT)
forall k a. NEMap k a -> NonEmpty (k, a)
NEMap.toList NEMap Hash32 (Entity Text Hash32 HashJWT)
entities NonEmpty (Hash32, Entity Text Hash32 HashJWT)
-> (NonEmpty (Hash32, Entity Text Hash32 HashJWT)
    -> Transaction (Set Hash32))
-> Transaction (Set Hash32)
forall a b. a -> (a -> b) -> b
& ((Hash32, Entity Text Hash32 HashJWT) -> Transaction (Set Hash32))
-> NonEmpty (Hash32, Entity Text Hash32 HashJWT)
-> Transaction (Set Hash32)
forall (m :: * -> *) (f :: * -> *) b a.
(Monad m, Foldable f, Monoid b) =>
(a -> m b) -> f a -> m b
foldMapM \(Hash32
hash, Entity Text Hash32 HashJWT
entity) ->
    Hash32 -> Entity Text Hash32 HashJWT -> Transaction EntityLocation
upsertEntitySomewhere Hash32
hash Entity Text Hash32 HashJWT
entity Transaction EntityLocation
-> (EntityLocation -> Set Hash32) -> Transaction (Set Hash32)
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
      EntityLocation
Q.EntityInMainStorage -> Set Hash32
forall a. Set a
Set.empty
      EntityLocation
Q.EntityInTempStorage -> Hash32 -> Set Hash32
forall a. a -> Set a
Set.singleton Hash32
hash

------------------------------------------------------------------------------------------------------------------------
-- Get causal hash by path

-- | Get the causal hash of a path hosted on Unison Share.
getCausalHashByPath ::
  -- | The Unison Share URL.
  BaseUrl ->
  Share.Path ->
  Cli (Either (SyncError GetCausalHashByPathError) (Maybe Share.HashJWT))
getCausalHashByPath :: BaseUrl
-> Path
-> Cli
     (Either (SyncError GetCausalHashByPathError) (Maybe HashJWT))
getCausalHashByPath BaseUrl
unisonShareUrl Path
repoPath = do
  Cli.Env {AuthenticatedHttpClient
$sel:authHTTPClient:Env :: Env -> AuthenticatedHttpClient
authHTTPClient :: AuthenticatedHttpClient
authHTTPClient} <- Cli Env
forall r (m :: * -> *). MonadReader r m => m r
ask
  IO (Either CodeserverTransportError GetCausalHashByPathResponse)
-> Cli
     (Either CodeserverTransportError GetCausalHashByPathResponse)
forall a. IO a -> Cli a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (AuthenticatedHttpClient
-> BaseUrl
-> GetCausalHashByPathRequest
-> IO (Either CodeserverTransportError GetCausalHashByPathResponse)
httpGetCausalHashByPath AuthenticatedHttpClient
authHTTPClient BaseUrl
unisonShareUrl (Path -> GetCausalHashByPathRequest
Share.GetCausalHashByPathRequest Path
repoPath)) Cli (Either CodeserverTransportError GetCausalHashByPathResponse)
-> (Either CodeserverTransportError GetCausalHashByPathResponse
    -> Either (SyncError GetCausalHashByPathError) (Maybe HashJWT))
-> Cli
     (Either (SyncError GetCausalHashByPathError) (Maybe HashJWT))
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
    Left CodeserverTransportError
err -> SyncError GetCausalHashByPathError
-> Either (SyncError GetCausalHashByPathError) (Maybe HashJWT)
forall a b. a -> Either a b
Left (CodeserverTransportError -> SyncError GetCausalHashByPathError
forall e. CodeserverTransportError -> SyncError e
TransportError CodeserverTransportError
err)
    Right (Share.GetCausalHashByPathSuccess Maybe HashJWT
maybeHashJwt) -> Maybe HashJWT
-> Either (SyncError GetCausalHashByPathError) (Maybe HashJWT)
forall a b. b -> Either a b
Right Maybe HashJWT
maybeHashJwt
    Right (Share.GetCausalHashByPathNoReadPermission Path
_) ->
      SyncError GetCausalHashByPathError
-> Either (SyncError GetCausalHashByPathError) (Maybe HashJWT)
forall a b. a -> Either a b
Left (GetCausalHashByPathError -> SyncError GetCausalHashByPathError
forall e. e -> SyncError e
SyncError (Path -> GetCausalHashByPathError
GetCausalHashByPathErrorNoReadPermission Path
repoPath))
    Right (Share.GetCausalHashByPathInvalidRepoInfo Text
err RepoInfo
repoInfo) ->
      SyncError GetCausalHashByPathError
-> Either (SyncError GetCausalHashByPathError) (Maybe HashJWT)
forall a b. a -> Either a b
Left (GetCausalHashByPathError -> SyncError GetCausalHashByPathError
forall e. e -> SyncError e
SyncError (Text -> RepoInfo -> GetCausalHashByPathError
GetCausalHashByPathErrorInvalidRepoInfo Text
err RepoInfo
repoInfo))
    Right GetCausalHashByPathResponse
Share.GetCausalHashByPathUserNotFound ->
      SyncError GetCausalHashByPathError
-> Either (SyncError GetCausalHashByPathError) (Maybe HashJWT)
forall a b. a -> Either a b
Left (GetCausalHashByPathError -> SyncError GetCausalHashByPathError
forall e. e -> SyncError e
SyncError (GetCausalHashByPathError -> SyncError GetCausalHashByPathError)
-> GetCausalHashByPathError -> SyncError GetCausalHashByPathError
forall a b. (a -> b) -> a -> b
$ RepoInfo -> GetCausalHashByPathError
GetCausalHashByPathErrorUserNotFound (Path -> RepoInfo
Share.pathRepoInfo Path
repoPath))

------------------------------------------------------------------------------------------------------------------------
-- Upload entities

data UploadDispatcherJob
  = UploadDispatcherReturnFailure (SyncError Share.UploadEntitiesError)
  | UploadDispatcherForkWorkerWhenAvailable (NESet Hash32)
  | UploadDispatcherForkWorker (NESet Hash32)
  | UploadDispatcherDone

-- | Upload a set of entities to Unison Share. If the server responds that it cannot yet store any hash(es) due to
-- missing dependencies, send those dependencies too, and on and on, until the server stops responding that it's missing
-- anything.
--
-- Returns true on success, false on failure (because the user does not have write permission).
uploadEntities ::
  BaseUrl ->
  Share.RepoInfo ->
  NESet Hash32 ->
  (Int -> IO ()) ->
  Cli (Either (SyncError Share.UploadEntitiesError) ())
uploadEntities :: BaseUrl
-> RepoInfo
-> NESet Hash32
-> (Int -> IO ())
-> Cli (Either (SyncError UploadEntitiesError) ())
uploadEntities BaseUrl
unisonShareUrl RepoInfo
repoInfo NESet Hash32
hashes0 Int -> IO ()
uploadedCallback = do
  Cli.Env {AuthenticatedHttpClient
$sel:authHTTPClient:Env :: Env -> AuthenticatedHttpClient
authHTTPClient :: AuthenticatedHttpClient
authHTTPClient, Codebase IO Symbol Ann
$sel:codebase:Env :: Env -> Codebase IO Symbol Ann
codebase :: Codebase IO Symbol Ann
codebase} <- Cli Env
forall r (m :: * -> *). MonadReader r m => m r
ask

  IO (Either (SyncError UploadEntitiesError) ())
-> Cli (Either (SyncError UploadEntitiesError) ())
forall a. IO a -> Cli a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO do
    TVar (Set Hash32)
hashesVar <- Set Hash32 -> IO (TVar (Set Hash32))
forall a. a -> IO (TVar a)
newTVarIO (NESet Hash32 -> Set Hash32
forall a. NESet a -> Set a
NESet.toSet NESet Hash32
hashes0)
    -- Semantically, this is the set of hashes we've uploaded so far, but we do delete from it when it's safe to, so it
    -- doesn't grow unbounded. It's used to filter out hashes that would be duplicate uploads: the server, when
    -- responding to any particular upload request, may declare that it still needs some hashes that we're in the
    -- process of uploading from another thread.
    TVar (Set Hash32)
dedupeVar <- Set Hash32 -> IO (TVar (Set Hash32))
forall a. a -> IO (TVar a)
newTVarIO Set Hash32
forall a. Set a
Set.empty
    WorkerCount
nextWorkerIdVar <- Int -> IO WorkerCount
forall a. a -> IO (TVar a)
newTVarIO Int
0
    TVar (Set Int)
workersVar <- Set Int -> IO (TVar (Set Int))
forall a. a -> IO (TVar a)
newTVarIO Set Int
forall a. Set a
Set.empty
    TMVar (SyncError UploadEntitiesError)
workerFailedVar <- IO (TMVar (SyncError UploadEntitiesError))
forall a. IO (TMVar a)
newEmptyTMVarIO

    (Scope -> IO (Either (SyncError UploadEntitiesError) ()))
-> IO (Either (SyncError UploadEntitiesError) ())
forall a. (Scope -> IO a) -> IO a
Ki.scoped \Scope
scope ->
      Scope
-> AuthenticatedHttpClient
-> (forall x. Transaction x -> IO x)
-> TVar (Set Hash32)
-> TVar (Set Hash32)
-> WorkerCount
-> TVar (Set Int)
-> TMVar (SyncError UploadEntitiesError)
-> IO (Either (SyncError UploadEntitiesError) ())
dispatcher
        Scope
scope
        AuthenticatedHttpClient
authHTTPClient
        (Codebase IO Symbol Ann -> Transaction a -> IO a
forall (m :: * -> *) v a b.
MonadIO m =>
Codebase m v a -> Transaction b -> m b
Codebase.runTransaction Codebase IO Symbol Ann
codebase)
        TVar (Set Hash32)
hashesVar
        TVar (Set Hash32)
dedupeVar
        WorkerCount
nextWorkerIdVar
        TVar (Set Int)
workersVar
        TMVar (SyncError UploadEntitiesError)
workerFailedVar
  where
    dispatcher ::
      Ki.Scope ->
      AuthenticatedHttpClient ->
      (forall a. Sqlite.Transaction a -> IO a) ->
      TVar (Set Hash32) ->
      TVar (Set Hash32) ->
      TVar Int ->
      TVar (Set Int) ->
      TMVar (SyncError Share.UploadEntitiesError) ->
      IO (Either (SyncError Share.UploadEntitiesError) ())
    dispatcher :: Scope
-> AuthenticatedHttpClient
-> (forall x. Transaction x -> IO x)
-> TVar (Set Hash32)
-> TVar (Set Hash32)
-> WorkerCount
-> TVar (Set Int)
-> TMVar (SyncError UploadEntitiesError)
-> IO (Either (SyncError UploadEntitiesError) ())
dispatcher Scope
scope AuthenticatedHttpClient
httpClient forall x. Transaction x -> IO x
runTransaction TVar (Set Hash32)
hashesVar TVar (Set Hash32)
dedupeVar WorkerCount
nextWorkerIdVar TVar (Set Int)
workersVar TMVar (SyncError UploadEntitiesError)
workerFailedVar = do
      IO (Either (SyncError UploadEntitiesError) ())
loop
      where
        loop :: IO (Either (SyncError Share.UploadEntitiesError) ())
        loop :: IO (Either (SyncError UploadEntitiesError) ())
loop =
          [STM UploadDispatcherJob]
-> IO (Either (SyncError UploadEntitiesError) ())
doJob [STM UploadDispatcherJob
checkForFailureMode, STM UploadDispatcherJob
dispatchWorkMode, STM UploadDispatcherJob
checkIfDoneMode]

        doJob :: [STM UploadDispatcherJob] -> IO (Either (SyncError Share.UploadEntitiesError) ())
        doJob :: [STM UploadDispatcherJob]
-> IO (Either (SyncError UploadEntitiesError) ())
doJob [STM UploadDispatcherJob]
jobs =
          STM UploadDispatcherJob -> IO UploadDispatcherJob
forall a. STM a -> IO a
atomically ([STM UploadDispatcherJob] -> STM UploadDispatcherJob
forall (t :: * -> *) (f :: * -> *) a.
(Foldable t, Alternative f) =>
t (f a) -> f a
asum [STM UploadDispatcherJob]
jobs) IO UploadDispatcherJob
-> (UploadDispatcherJob
    -> IO (Either (SyncError UploadEntitiesError) ()))
-> IO (Either (SyncError UploadEntitiesError) ())
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
            UploadDispatcherReturnFailure SyncError UploadEntitiesError
err -> Either (SyncError UploadEntitiesError) ()
-> IO (Either (SyncError UploadEntitiesError) ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SyncError UploadEntitiesError
-> Either (SyncError UploadEntitiesError) ()
forall a b. a -> Either a b
Left SyncError UploadEntitiesError
err)
            UploadDispatcherForkWorkerWhenAvailable NESet Hash32
hashes -> [STM UploadDispatcherJob]
-> IO (Either (SyncError UploadEntitiesError) ())
doJob [NESet Hash32 -> STM UploadDispatcherJob
forkWorkerMode NESet Hash32
hashes, STM UploadDispatcherJob
checkForFailureMode]
            UploadDispatcherForkWorker NESet Hash32
hashes -> do
              Int
workerId <-
                STM Int -> IO Int
forall a. STM a -> IO a
atomically do
                  Int
workerId <- WorkerCount -> STM Int
forall a. TVar a -> STM a
readTVar WorkerCount
nextWorkerIdVar
                  WorkerCount -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar WorkerCount
nextWorkerIdVar (Int -> STM ()) -> Int -> STM ()
forall a b. (a -> b) -> a -> b
$! Int
workerId Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
                  TVar (Set Int) -> (Set Int -> Set Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Set Int)
workersVar (Int -> Set Int -> Set Int
forall a. Ord a => a -> Set a -> Set a
Set.insert Int
workerId)
                  pure Int
workerId
              Thread ()
_ <-
                forall a. Scope -> IO a -> IO (Thread a)
Ki.fork @() Scope
scope do
                  AuthenticatedHttpClient
-> (forall x. Transaction x -> IO x)
-> TVar (Set Hash32)
-> TVar (Set Hash32)
-> TVar (Set Int)
-> TMVar (SyncError UploadEntitiesError)
-> Int
-> NESet Hash32
-> IO ()
worker AuthenticatedHttpClient
httpClient Transaction a -> IO a
forall x. Transaction x -> IO x
runTransaction TVar (Set Hash32)
hashesVar TVar (Set Hash32)
dedupeVar TVar (Set Int)
workersVar TMVar (SyncError UploadEntitiesError)
workerFailedVar Int
workerId NESet Hash32
hashes
              IO (Either (SyncError UploadEntitiesError) ())
loop
            UploadDispatcherJob
UploadDispatcherDone -> Either (SyncError UploadEntitiesError) ()
-> IO (Either (SyncError UploadEntitiesError) ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> Either (SyncError UploadEntitiesError) ()
forall a b. b -> Either a b
Right ())

        checkForFailureMode :: STM UploadDispatcherJob
        checkForFailureMode :: STM UploadDispatcherJob
checkForFailureMode = do
          SyncError UploadEntitiesError
err <- TMVar (SyncError UploadEntitiesError)
-> STM (SyncError UploadEntitiesError)
forall a. TMVar a -> STM a
readTMVar TMVar (SyncError UploadEntitiesError)
workerFailedVar
          pure (SyncError UploadEntitiesError -> UploadDispatcherJob
UploadDispatcherReturnFailure SyncError UploadEntitiesError
err)

        dispatchWorkMode :: STM UploadDispatcherJob
        dispatchWorkMode :: STM UploadDispatcherJob
dispatchWorkMode = do
          Set Hash32
hashes <- TVar (Set Hash32) -> STM (Set Hash32)
forall a. TVar a -> STM a
readTVar TVar (Set Hash32)
hashesVar
          Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Set Hash32 -> Bool
forall a. Set a -> Bool
Set.null Set Hash32
hashes) STM ()
forall a. STM a
retry
          let (Set Hash32
hashes1, Set Hash32
hashes2) = Int -> Set Hash32 -> (Set Hash32, Set Hash32)
forall a. Int -> Set a -> (Set a, Set a)
Set.splitAt Int
syncChunkSize Set Hash32
hashes
          TVar (Set Hash32) -> (Set Hash32 -> Set Hash32) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Set Hash32)
dedupeVar (Set Hash32 -> Set Hash32 -> Set Hash32
forall a. Ord a => Set a -> Set a -> Set a
Set.union Set Hash32
hashes1)
          TVar (Set Hash32) -> Set Hash32 -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Set Hash32)
hashesVar Set Hash32
hashes2
          pure (NESet Hash32 -> UploadDispatcherJob
UploadDispatcherForkWorkerWhenAvailable (Set Hash32 -> NESet Hash32
forall a. Set a -> NESet a
NESet.unsafeFromSet Set Hash32
hashes1))

        forkWorkerMode :: NESet Hash32 -> STM UploadDispatcherJob
        forkWorkerMode :: NESet Hash32 -> STM UploadDispatcherJob
forkWorkerMode NESet Hash32
hashes = do
          Set Int
workers <- TVar (Set Int) -> STM (Set Int)
forall a. TVar a -> STM a
readTVar TVar (Set Int)
workersVar
          Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Set Int -> Int
forall a. Set a -> Int
Set.size Set Int
workers Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
maxSimultaneousPushWorkers) STM ()
forall a. STM a
retry
          pure (NESet Hash32 -> UploadDispatcherJob
UploadDispatcherForkWorker NESet Hash32
hashes)

        checkIfDoneMode :: STM UploadDispatcherJob
        checkIfDoneMode :: STM UploadDispatcherJob
checkIfDoneMode = do
          Set Int
workers <- TVar (Set Int) -> STM (Set Int)
forall a. TVar a -> STM a
readTVar TVar (Set Int)
workersVar
          Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not (Set Int -> Bool
forall a. Set a -> Bool
Set.null Set Int
workers)) STM ()
forall a. STM a
retry
          pure UploadDispatcherJob
UploadDispatcherDone

    worker ::
      AuthenticatedHttpClient ->
      (forall a. Sqlite.Transaction a -> IO a) ->
      TVar (Set Hash32) ->
      TVar (Set Hash32) ->
      TVar (Set Int) ->
      TMVar (SyncError Share.UploadEntitiesError) ->
      Int ->
      NESet Hash32 ->
      IO ()
    worker :: AuthenticatedHttpClient
-> (forall x. Transaction x -> IO x)
-> TVar (Set Hash32)
-> TVar (Set Hash32)
-> TVar (Set Int)
-> TMVar (SyncError UploadEntitiesError)
-> Int
-> NESet Hash32
-> IO ()
worker AuthenticatedHttpClient
httpClient forall x. Transaction x -> IO x
runTransaction TVar (Set Hash32)
hashesVar TVar (Set Hash32)
dedupeVar TVar (Set Int)
workersVar TMVar (SyncError UploadEntitiesError)
workerFailedVar Int
workerId NESet Hash32
hashes = do
      NEMap Hash32 (Entity Text Hash32 Hash32)
entities <-
        (NonEmpty (Hash32, Entity Text Hash32 Hash32)
 -> NEMap Hash32 (Entity Text Hash32 Hash32))
-> IO (NonEmpty (Hash32, Entity Text Hash32 Hash32))
-> IO (NEMap Hash32 (Entity Text Hash32 Hash32))
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap NonEmpty (Hash32, Entity Text Hash32 Hash32)
-> NEMap Hash32 (Entity Text Hash32 Hash32)
forall k a. Eq k => NonEmpty (k, a) -> NEMap k a
NEMap.fromAscList do
          Transaction (NonEmpty (Hash32, Entity Text Hash32 Hash32))
-> IO (NonEmpty (Hash32, Entity Text Hash32 Hash32))
forall x. Transaction x -> IO x
runTransaction do
            NonEmpty Hash32
-> (Hash32 -> Transaction (Hash32, Entity Text Hash32 Hash32))
-> Transaction (NonEmpty (Hash32, Entity Text Hash32 Hash32))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for (NESet Hash32 -> NonEmpty Hash32
forall a. NESet a -> NonEmpty a
NESet.toAscList NESet Hash32
hashes) \Hash32
hash -> do
              Entity Text Hash32 Hash32
entity <- Hash32 -> Transaction (Entity Text Hash32 Hash32)
expectEntity Hash32
hash
              pure (Hash32
hash, Entity Text Hash32 Hash32
entity)

      Either (SyncError UploadEntitiesError) (Set Hash32)
result <-
        AuthenticatedHttpClient
-> BaseUrl
-> UploadEntitiesRequest
-> IO (Either CodeserverTransportError UploadEntitiesResponse)
httpUploadEntities AuthenticatedHttpClient
httpClient BaseUrl
unisonShareUrl Share.UploadEntitiesRequest {NEMap Hash32 (Entity Text Hash32 Hash32)
entities :: NEMap Hash32 (Entity Text Hash32 Hash32)
$sel:entities:UploadEntitiesRequest :: NEMap Hash32 (Entity Text Hash32 Hash32)
entities, RepoInfo
repoInfo :: RepoInfo
$sel:repoInfo:UploadEntitiesRequest :: RepoInfo
repoInfo} IO (Either CodeserverTransportError UploadEntitiesResponse)
-> (Either CodeserverTransportError UploadEntitiesResponse
    -> Either (SyncError UploadEntitiesError) (Set Hash32))
-> IO (Either (SyncError UploadEntitiesError) (Set Hash32))
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
          Left CodeserverTransportError
err -> SyncError UploadEntitiesError
-> Either (SyncError UploadEntitiesError) (Set Hash32)
forall a b. a -> Either a b
Left (CodeserverTransportError -> SyncError UploadEntitiesError
forall e. CodeserverTransportError -> SyncError e
TransportError CodeserverTransportError
err)
          Right UploadEntitiesResponse
response ->
            case UploadEntitiesResponse
response of
              UploadEntitiesResponse
Share.UploadEntitiesSuccess -> Set Hash32 -> Either (SyncError UploadEntitiesError) (Set Hash32)
forall a b. b -> Either a b
Right Set Hash32
forall a. Set a
Set.empty
              Share.UploadEntitiesFailure UploadEntitiesError
err ->
                case UploadEntitiesError
err of
                  Share.UploadEntitiesError'NeedDependencies (Share.NeedDependencies NESet Hash32
moreHashes) ->
                    Set Hash32 -> Either (SyncError UploadEntitiesError) (Set Hash32)
forall a b. b -> Either a b
Right (NESet Hash32 -> Set Hash32
forall a. NESet a -> Set a
NESet.toSet NESet Hash32
moreHashes)
                  UploadEntitiesError
err -> SyncError UploadEntitiesError
-> Either (SyncError UploadEntitiesError) (Set Hash32)
forall a b. a -> Either a b
Left (UploadEntitiesError -> SyncError UploadEntitiesError
forall e. e -> SyncError e
SyncError UploadEntitiesError
err)

      case Either (SyncError UploadEntitiesError) (Set Hash32)
result of
        Left SyncError UploadEntitiesError
err -> IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (TMVar (SyncError UploadEntitiesError)
-> SyncError UploadEntitiesError -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar (SyncError UploadEntitiesError)
workerFailedVar SyncError UploadEntitiesError
err))
        Right Set Hash32
moreHashes -> do
          Int -> IO ()
uploadedCallback (NESet Hash32 -> Int
forall a. NESet a -> Int
NESet.size NESet Hash32
hashes)
          Maybe Int
maybeYoungestWorkerThatWasAlive <-
            STM (Maybe Int) -> IO (Maybe Int)
forall a. STM a -> IO a
atomically do
              -- Record ourselves as "dead". The only work we have left to do is remove the hashes we just uploaded from
              -- the `dedupe` set, but whether or not we are "alive" is relevant only to:
              --
              --   - The main dispatcher thread, which terminates when there are no more hashes to upload, and no alive
              --     workers. It is not important for us to delete from the `dedupe` set in this case.
              --
              --   - Other worker threads, each of which independently decides when it is safe to delete the set of
              --     hashes they just uploaded from the `dedupe` set (as we are doing now).
              !Set Int
workers <- Int -> Set Int -> Set Int
forall a. Ord a => a -> Set a -> Set a
Set.delete Int
workerId (Set Int -> Set Int) -> STM (Set Int) -> STM (Set Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar (Set Int) -> STM (Set Int)
forall a. TVar a -> STM a
readTVar TVar (Set Int)
workersVar
              TVar (Set Int) -> Set Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Set Int)
workersVar Set Int
workers
              -- Add more work (i.e. hashes to upload) to the work queue (really a work set), per the response we just
              -- got from the server. Remember to only add hashes that aren't in the `dedupe` set (see the comment on
              -- the dedupe set above for more info).
              Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not (Set Hash32 -> Bool
forall a. Set a -> Bool
Set.null Set Hash32
moreHashes)) do
                Set Hash32
dedupe <- TVar (Set Hash32) -> STM (Set Hash32)
forall a. TVar a -> STM a
readTVar TVar (Set Hash32)
dedupeVar
                Set Hash32
hashes0 <- TVar (Set Hash32) -> STM (Set Hash32)
forall a. TVar a -> STM a
readTVar TVar (Set Hash32)
hashesVar
                TVar (Set Hash32) -> Set Hash32 -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Set Hash32)
hashesVar (Set Hash32 -> STM ()) -> Set Hash32 -> STM ()
forall a b. (a -> b) -> a -> b
$! Set Hash32 -> Set Hash32 -> Set Hash32
forall a. Ord a => Set a -> Set a -> Set a
Set.union (Set Hash32 -> Set Hash32 -> Set Hash32
forall a. Ord a => Set a -> Set a -> Set a
Set.difference Set Hash32
moreHashes Set Hash32
dedupe) Set Hash32
hashes0
              pure (Set Int -> Maybe Int
forall a. Set a -> Maybe a
Set.lookupMax Set Int
workers)
          -- Block until we are sure that the server does not have any uncommitted transactions that see a version of
          -- the database that does not include the entities we just uploaded. After that point, it's fine to remove the
          -- hashes of the entities we just uploaded from the `dedupe` set, because they will never be relevant for any
          -- subsequent deduping operations. If we didn't delete from the `dedupe` set, this algorithm would still be
          -- correct, it would just use an unbounded amount of memory to remember all the hashes we've uploaded so far.
          Maybe Int -> (Int -> IO ()) -> IO ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
whenJust Maybe Int
maybeYoungestWorkerThatWasAlive \Int
youngestWorkerThatWasAlive -> do
            STM () -> IO ()
forall a. STM a -> IO a
atomically do
              Set Int
workers <- TVar (Set Int) -> STM (Set Int)
forall a. TVar a -> STM a
readTVar TVar (Set Int)
workersVar
              Maybe Int -> (Int -> STM ()) -> STM ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
whenJust (Set Int -> Maybe Int
forall a. Set a -> Maybe a
Set.lookupMin Set Int
workers) \Int
oldestWorkerAlive ->
                Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
oldestWorkerAlive Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
youngestWorkerThatWasAlive) STM ()
forall a. STM a
retry
          STM () -> IO ()
forall a. STM a -> IO a
atomically (TVar (Set Hash32) -> (Set Hash32 -> Set Hash32) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Set Hash32)
dedupeVar (Set Hash32 -> Set Hash32 -> Set Hash32
forall a. Ord a => Set a -> Set a -> Set a
`Set.difference` (NESet Hash32 -> Set Hash32
forall a. NESet a -> Set a
NESet.toSet NESet Hash32
hashes)))

------------------------------------------------------------------------------------------------------------------------
-- Database operations

-- | "Elaborate" a set of `temp_entity` hashes.
--
-- For each hash, then we ought to instead download its missing dependencies (which themselves are
--    elaborated by this same procedure, in case we have any of *them* already in temp storage, too.
-- 3. If it's in main storage, we should ignore it.
--
-- In the end, we return a set of hashes that correspond to entities we actually need to download.
elaborateHashes :: NESet Hash32 -> Sqlite.Transaction (Set Share.HashJWT)
elaborateHashes :: NESet Hash32 -> Transaction (Set HashJWT)
elaborateHashes NESet Hash32
hashes =
  NonEmpty Hash32 -> Transaction [Text]
Q.elaborateHashes (NESet Hash32 -> NonEmpty Hash32
forall a. NESet a -> NonEmpty a
NESet.toList NESet Hash32
hashes) Transaction [Text]
-> ([Text] -> Set HashJWT) -> Transaction (Set HashJWT)
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> [HashJWT] -> Set HashJWT
forall a. Ord a => [a] -> Set a
Set.fromList ([HashJWT] -> Set HashJWT)
-> ([Text] -> [HashJWT]) -> [Text] -> Set HashJWT
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. Coercible a b => a -> b
forall a b. Coercible a b => a -> b
coerce @[Text] @[Share.HashJWT]

-- | Upsert a downloaded entity "somewhere" -
--
--   1. Nowhere if we already had the entity (in main or temp storage).
--   2. In main storage if we already have all of its dependencies in main storage.
--   3. In temp storage otherwise.
upsertEntitySomewhere ::
  Hash32 ->
  Share.Entity Text Hash32 Share.HashJWT ->
  Sqlite.Transaction Q.EntityLocation
upsertEntitySomewhere :: Hash32 -> Entity Text Hash32 HashJWT -> Transaction EntityLocation
upsertEntitySomewhere Hash32
hash Entity Text Hash32 HashJWT
entity =
  Hash32 -> Transaction (Maybe EntityLocation)
Q.entityLocation Hash32
hash Transaction (Maybe EntityLocation)
-> (Maybe EntityLocation -> Transaction EntityLocation)
-> Transaction EntityLocation
forall a b. Transaction a -> (a -> Transaction b) -> Transaction b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Just EntityLocation
location -> EntityLocation -> Transaction EntityLocation
forall a. a -> Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure EntityLocation
location
    Maybe EntityLocation
Nothing -> do
      Map Hash32 HashJWT
missingDependencies1 :: Map Hash32 Share.HashJWT <-
        Entity Text Hash32 HashJWT -> Set HashJWT
forall hash text noSyncHash.
Ord hash =>
Entity text noSyncHash hash -> Set hash
Share.entityDependencies Entity Text Hash32 HashJWT
entity
          Set HashJWT
-> (Set HashJWT -> Transaction (Map Hash32 HashJWT))
-> Transaction (Map Hash32 HashJWT)
forall a b. a -> (a -> b) -> b
& (HashJWT -> Transaction (Map Hash32 HashJWT))
-> Set HashJWT -> Transaction (Map Hash32 HashJWT)
forall (m :: * -> *) (f :: * -> *) b a.
(Monad m, Foldable f, Monoid b) =>
(a -> m b) -> f a -> m b
foldMapM
            ( \HashJWT
hashJwt -> do
                let hash :: Hash32
hash = HashJWT -> Hash32
Share.hashJWTHash HashJWT
hashJwt
                Hash32 -> Transaction Bool
Q.entityExists Hash32
hash Transaction Bool
-> (Bool -> Map Hash32 HashJWT) -> Transaction (Map Hash32 HashJWT)
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
                  Bool
True -> Map Hash32 HashJWT
forall k a. Map k a
Map.empty
                  Bool
False -> Hash32 -> HashJWT -> Map Hash32 HashJWT
forall k a. k -> a -> Map k a
Map.singleton Hash32
hash HashJWT
hashJwt
            )
      case Map Hash32 HashJWT -> Maybe (NEMap Hash32 HashJWT)
forall k a. Map k a -> Maybe (NEMap k a)
NEMap.nonEmptyMap Map Hash32 HashJWT
missingDependencies1 of
        Maybe (NEMap Hash32 HashJWT)
Nothing -> do
          Either CausalHashId ObjectId
_id <- HashHandle
-> Hash32
-> TempEntity
-> Transaction (Either CausalHashId ObjectId)
Q.saveTempEntityInMain HashHandle
v2HashHandle Hash32
hash ((HashJWT -> Hash32) -> Entity Text Hash32 HashJWT -> TempEntity
forall hash.
(hash -> Hash32) -> Entity Text Hash32 hash -> TempEntity
entityToTempEntity HashJWT -> Hash32
Share.hashJWTHash Entity Text Hash32 HashJWT
entity)
          pure EntityLocation
Q.EntityInMainStorage
        Just NEMap Hash32 HashJWT
missingDependencies -> do
          Hash32 -> TempEntity -> NEMap Hash32 Text -> Transaction ()
Q.insertTempEntity
            Hash32
hash
            ((HashJWT -> Hash32) -> Entity Text Hash32 HashJWT -> TempEntity
forall hash.
(hash -> Hash32) -> Entity Text Hash32 hash -> TempEntity
entityToTempEntity HashJWT -> Hash32
Share.hashJWTHash Entity Text Hash32 HashJWT
entity)
            ( forall a b. Coercible a b => a -> b
forall a b. Coercible a b => a -> b
coerce
                @(NEMap Hash32 Share.HashJWT)
                @(NEMap Hash32 Text)
                NEMap Hash32 HashJWT
missingDependencies
            )
          pure EntityLocation
Q.EntityInTempStorage

------------------------------------------------------------------------------------------------------------------------
-- HTTP calls

httpGetCausalHashByPath ::
  Auth.AuthenticatedHttpClient ->
  BaseUrl ->
  Share.GetCausalHashByPathRequest ->
  IO (Either CodeserverTransportError Share.GetCausalHashByPathResponse)
httpDownloadEntities ::
  Auth.AuthenticatedHttpClient ->
  BaseUrl ->
  Share.DownloadEntitiesRequest ->
  IO (Either CodeserverTransportError Share.DownloadEntitiesResponse)
httpUploadEntities ::
  Auth.AuthenticatedHttpClient ->
  BaseUrl ->
  Share.UploadEntitiesRequest ->
  IO (Either CodeserverTransportError Share.UploadEntitiesResponse)
( AuthenticatedHttpClient
-> BaseUrl
-> GetCausalHashByPathRequest
-> IO (Either CodeserverTransportError GetCausalHashByPathResponse)
httpGetCausalHashByPath,
  AuthenticatedHttpClient
-> BaseUrl
-> DownloadEntitiesRequest
-> IO (Either CodeserverTransportError DownloadEntitiesResponse)
httpDownloadEntities,
  AuthenticatedHttpClient
-> BaseUrl
-> UploadEntitiesRequest
-> IO (Either CodeserverTransportError UploadEntitiesResponse)
httpUploadEntities
  ) =
    let ( GetCausalHashByPathRequest
-> ReaderT
     ClientEnv
     (ExceptT CodeserverTransportError IO)
     GetCausalHashByPathResponse
httpGetCausalHashByPath
            Servant.:<|> DownloadEntitiesRequest
-> ReaderT
     ClientEnv
     (ExceptT CodeserverTransportError IO)
     DownloadEntitiesResponse
httpDownloadEntities
            Servant.:<|> UploadEntitiesRequest
-> ReaderT
     ClientEnv
     (ExceptT CodeserverTransportError IO)
     UploadEntitiesResponse
httpUploadEntities
          ) =
            let pp :: Proxy ("ucm" Servant.:> "v1" Servant.:> "sync" Servant.:> Share.API)
                pp :: Proxy ("ucm" :> ("v1" :> ("sync" :> API)))
pp = Proxy ("ucm" :> ("v1" :> ("sync" :> API)))
forall {k} (t :: k). Proxy t
Proxy
             in Proxy ("ucm" :> ("v1" :> ("sync" :> API)))
-> (forall a.
    ClientM a
    -> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) a)
-> Client ClientM ("ucm" :> ("v1" :> ("sync" :> API)))
-> Client
     (ReaderT ClientEnv (ExceptT CodeserverTransportError IO))
     ("ucm" :> ("v1" :> ("sync" :> API)))
forall api (m :: * -> *) (n :: * -> *).
HasClient ClientM api =>
Proxy api -> (forall a. m a -> n a) -> Client m api -> Client n api
Servant.hoistClient Proxy ("ucm" :> ("v1" :> ("sync" :> API)))
pp ClientM a
-> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) a
forall a.
ClientM a
-> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) a
hoist (Proxy ("ucm" :> ("v1" :> ("sync" :> API)))
-> Client ClientM ("ucm" :> ("v1" :> ("sync" :> API)))
forall api.
HasClient ClientM api =>
Proxy api -> Client ClientM api
Servant.client Proxy ("ucm" :> ("v1" :> ("sync" :> API)))
pp)
     in ( (GetCausalHashByPathRequest
 -> ReaderT
      ClientEnv
      (ExceptT CodeserverTransportError IO)
      GetCausalHashByPathResponse)
-> AuthenticatedHttpClient
-> BaseUrl
-> GetCausalHashByPathRequest
-> IO (Either CodeserverTransportError GetCausalHashByPathResponse)
forall req resp.
(req
 -> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) resp)
-> AuthenticatedHttpClient
-> BaseUrl
-> req
-> IO (Either CodeserverTransportError resp)
go GetCausalHashByPathRequest
-> ReaderT
     ClientEnv
     (ExceptT CodeserverTransportError IO)
     GetCausalHashByPathResponse
httpGetCausalHashByPath,
          (DownloadEntitiesRequest
 -> ReaderT
      ClientEnv
      (ExceptT CodeserverTransportError IO)
      DownloadEntitiesResponse)
-> AuthenticatedHttpClient
-> BaseUrl
-> DownloadEntitiesRequest
-> IO (Either CodeserverTransportError DownloadEntitiesResponse)
forall req resp.
(req
 -> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) resp)
-> AuthenticatedHttpClient
-> BaseUrl
-> req
-> IO (Either CodeserverTransportError resp)
go DownloadEntitiesRequest
-> ReaderT
     ClientEnv
     (ExceptT CodeserverTransportError IO)
     DownloadEntitiesResponse
httpDownloadEntities,
          (UploadEntitiesRequest
 -> ReaderT
      ClientEnv
      (ExceptT CodeserverTransportError IO)
      UploadEntitiesResponse)
-> AuthenticatedHttpClient
-> BaseUrl
-> UploadEntitiesRequest
-> IO (Either CodeserverTransportError UploadEntitiesResponse)
forall req resp.
(req
 -> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) resp)
-> AuthenticatedHttpClient
-> BaseUrl
-> req
-> IO (Either CodeserverTransportError resp)
go UploadEntitiesRequest
-> ReaderT
     ClientEnv
     (ExceptT CodeserverTransportError IO)
     UploadEntitiesResponse
httpUploadEntities
        )
    where
      hoist :: Servant.ClientM a -> ReaderT Servant.ClientEnv (ExceptT CodeserverTransportError IO) a
      hoist :: forall a.
ClientM a
-> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) a
hoist ClientM a
m = do
        ClientEnv
clientEnv <- ReaderT ClientEnv (ExceptT CodeserverTransportError IO) ClientEnv
forall (m :: * -> *) r. Monad m => ReaderT r m r
Reader.ask
        IO (Either ClientError a)
-> ReaderT
     ClientEnv
     (ExceptT CodeserverTransportError IO)
     (Either ClientError a)
forall a.
IO a -> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (ClientM a -> ClientEnv -> IO (Either ClientError a)
forall a. ClientM a -> ClientEnv -> IO (Either ClientError a)
Servant.runClientM ClientM a
m ClientEnv
clientEnv) ReaderT
  ClientEnv
  (ExceptT CodeserverTransportError IO)
  (Either ClientError a)
-> (Either ClientError a
    -> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) a)
-> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) a
forall a b.
ReaderT ClientEnv (ExceptT CodeserverTransportError IO) a
-> (a -> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) b)
-> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
          Right a
a -> a -> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) a
forall a.
a -> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
a
          Left ClientError
err -> do
            DebugFlag
-> String
-> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) ()
forall (m :: * -> *). Monad m => DebugFlag -> String -> m ()
Debug.debugLogM DebugFlag
Debug.Sync (ClientError -> String
forall a. Show a => a -> String
show ClientError
err)
            CodeserverTransportError
-> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) a
forall a.
CodeserverTransportError
-> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError case ClientError
err of
              Servant.FailureResponse RequestF () (BaseUrl, ByteString)
_req Response
resp ->
                case Status -> Int
HTTP.statusCode (Status -> Int) -> Status -> Int
forall a b. (a -> b) -> a -> b
$ Response -> Status
forall a. ResponseF a -> Status
Servant.responseStatusCode Response
resp of
                  Int
401 -> BaseUrl -> CodeserverTransportError
Unauthenticated (ClientEnv -> BaseUrl
Servant.baseUrl ClientEnv
clientEnv)
                  -- The server should provide semantically relevant permission-denied messages
                  -- when possible, but this should catch any we miss.
                  Int
403 -> Text -> CodeserverTransportError
PermissionDenied (Text -> Text
Text.Lazy.toStrict (Text -> Text) -> (ByteString -> Text) -> ByteString -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text
Text.Lazy.decodeUtf8 (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ Response -> ByteString
forall a. ResponseF a -> a
Servant.responseBody Response
resp)
                  Int
408 -> CodeserverTransportError
Timeout
                  Int
429 -> CodeserverTransportError
RateLimitExceeded
                  Int
504 -> CodeserverTransportError
Timeout
                  Int
_ -> Response -> CodeserverTransportError
UnexpectedResponse Response
resp
              Servant.DecodeFailure Text
msg Response
resp -> Text -> Response -> CodeserverTransportError
DecodeFailure Text
msg Response
resp
              Servant.UnsupportedContentType MediaType
_ct Response
resp -> Response -> CodeserverTransportError
UnexpectedResponse Response
resp
              Servant.InvalidContentTypeHeader Response
resp -> Response -> CodeserverTransportError
UnexpectedResponse Response
resp
              Servant.ConnectionError SomeException
_ -> BaseUrl -> CodeserverTransportError
UnreachableCodeserver (ClientEnv -> BaseUrl
Servant.baseUrl ClientEnv
clientEnv)

      go ::
        (req -> ReaderT Servant.ClientEnv (ExceptT CodeserverTransportError IO) resp) ->
        Auth.AuthenticatedHttpClient ->
        BaseUrl ->
        req ->
        IO (Either CodeserverTransportError resp)
      go :: forall req resp.
(req
 -> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) resp)
-> AuthenticatedHttpClient
-> BaseUrl
-> req
-> IO (Either CodeserverTransportError resp)
go req -> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) resp
f (Auth.AuthenticatedHttpClient Manager
httpClient) BaseUrl
unisonShareUrl req
req =
        (Manager -> BaseUrl -> ClientEnv
Servant.mkClientEnv Manager
httpClient BaseUrl
unisonShareUrl)
          { Servant.makeClientRequest = \BaseUrl
url Request
request ->
              -- Disable client-side timeouts
              (BaseUrl -> Request -> IO Request
Servant.defaultMakeClientRequest BaseUrl
url Request
request)
                IO Request -> (Request -> Request) -> IO Request
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \Request
r ->
                  Request
r
                    { Http.Client.responseTimeout = Http.Client.responseTimeoutNone
                    }
          }
          ClientEnv
-> (ClientEnv -> ExceptT CodeserverTransportError IO resp)
-> ExceptT CodeserverTransportError IO resp
forall a b. a -> (a -> b) -> b
& ReaderT ClientEnv (ExceptT CodeserverTransportError IO) resp
-> ClientEnv -> ExceptT CodeserverTransportError IO resp
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (req -> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) resp
f req
req)
          ExceptT CodeserverTransportError IO resp
-> (ExceptT CodeserverTransportError IO resp
    -> IO (Either CodeserverTransportError resp))
-> IO (Either CodeserverTransportError resp)
forall a b. a -> (a -> b) -> b
& ExceptT CodeserverTransportError IO resp
-> IO (Either CodeserverTransportError resp)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT