{-# LANGUAGE DataKinds #-}
{-# LANGUAGE TypeOperators #-}
module Unison.Share.Sync
(
getCausalHashByPath,
GetCausalHashByPathError (..),
uploadEntities,
pull,
PullError (..),
downloadEntities,
)
where
import Control.Concurrent.STM
import Control.Lens
import Control.Monad.Except
import Control.Monad.Reader (ask)
import Control.Monad.Trans.Reader (ReaderT, runReaderT)
import Control.Monad.Trans.Reader qualified as Reader
import Data.Map qualified as Map
import Data.Map.NonEmpty (NEMap)
import Data.Map.NonEmpty qualified as NEMap
import Data.Proxy
import Data.Set qualified as Set
import Data.Set.NonEmpty (NESet)
import Data.Set.NonEmpty qualified as NESet
import Data.Text.Lazy qualified as Text.Lazy
import Data.Text.Lazy.Encoding qualified as Text.Lazy
import GHC.IO (unsafePerformIO)
import Ki qualified
import Network.HTTP.Client qualified as Http.Client
import Network.HTTP.Types qualified as HTTP
import Servant.API qualified as Servant ((:<|>) (..), (:>))
import Servant.Client (BaseUrl)
import Servant.Client qualified as Servant
import System.Environment (lookupEnv)
import U.Codebase.HashTags (CausalHash)
import U.Codebase.Sqlite.Queries qualified as Q
import U.Codebase.Sqlite.V2.HashHandle (v2HashHandle)
import Unison.Auth.HTTPClient (AuthenticatedHttpClient)
import Unison.Auth.HTTPClient qualified as Auth
import Unison.Cli.Monad (Cli)
import Unison.Cli.Monad qualified as Cli
import Unison.Codebase qualified as Codebase
import Unison.Debug qualified as Debug
import Unison.Hash32 (Hash32)
import Unison.Prelude
import Unison.Share.API.Hash qualified as Share
import Unison.Share.ExpectedHashMismatches (expectedCausalHashMismatches, expectedComponentHashMismatches)
import Unison.Share.Sync.Types
import Unison.Sqlite qualified as Sqlite
import Unison.Sync.API qualified as Share (API)
import Unison.Sync.Common (entityToTempEntity, expectEntity, hash32ToCausalHash)
import Unison.Sync.EntityValidation qualified as EV
import Unison.Sync.Types qualified as Share
import Unison.Util.Monoid (foldMapM)
maxSimultaneousPullDownloaders :: Int
maxSimultaneousPullDownloaders :: Int
maxSimultaneousPullDownloaders = IO Int -> Int
forall a. IO a -> a
unsafePerformIO (IO Int -> Int) -> IO Int -> Int
forall a b. (a -> b) -> a -> b
$ do
String -> IO (Maybe String)
lookupEnv String
"UNISON_PULL_WORKERS" IO (Maybe String) -> (Maybe String -> Int) -> IO Int
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
Just String
n -> String -> Int
forall a. Read a => String -> a
read String
n
Maybe String
Nothing -> Int
5
{-# NOINLINE maxSimultaneousPullDownloaders #-}
maxSimultaneousPushWorkers :: Int
maxSimultaneousPushWorkers :: Int
maxSimultaneousPushWorkers = IO Int -> Int
forall a. IO a -> a
unsafePerformIO (IO Int -> Int) -> IO Int -> Int
forall a b. (a -> b) -> a -> b
$ do
String -> IO (Maybe String)
lookupEnv String
"UNISON_PUSH_WORKERS" IO (Maybe String) -> (Maybe String -> Int) -> IO Int
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
Just String
n -> String -> Int
forall a. Read a => String -> a
read String
n
Maybe String
Nothing -> Int
1
{-# NOINLINE maxSimultaneousPushWorkers #-}
syncChunkSize :: Int
syncChunkSize :: Int
syncChunkSize = IO Int -> Int
forall a. IO a -> a
unsafePerformIO (IO Int -> Int) -> IO Int -> Int
forall a b. (a -> b) -> a -> b
$ do
String -> IO (Maybe String)
lookupEnv String
"UNISON_SYNC_CHUNK_SIZE" IO (Maybe String) -> (Maybe String -> Int) -> IO Int
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
Just String
n -> String -> Int
forall a. Read a => String -> a
read String
n
Maybe String
Nothing -> Int
50
{-# NOINLINE syncChunkSize #-}
pull ::
BaseUrl ->
Share.Path ->
(Int -> IO ()) ->
Cli (Either (SyncError PullError) CausalHash)
pull :: BaseUrl
-> Path
-> (Int -> IO ())
-> Cli (Either (SyncError PullError) CausalHash)
pull BaseUrl
unisonShareUrl Path
repoPath Int -> IO ()
downloadedCallback =
BaseUrl
-> Path
-> Cli
(Either (SyncError GetCausalHashByPathError) (Maybe HashJWT))
getCausalHashByPath BaseUrl
unisonShareUrl Path
repoPath Cli (Either (SyncError GetCausalHashByPathError) (Maybe HashJWT))
-> (Either (SyncError GetCausalHashByPathError) (Maybe HashJWT)
-> Cli (Either (SyncError PullError) CausalHash))
-> Cli (Either (SyncError PullError) CausalHash)
forall a b. Cli a -> (a -> Cli b) -> Cli b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left SyncError GetCausalHashByPathError
err -> Either (SyncError PullError) CausalHash
-> Cli (Either (SyncError PullError) CausalHash)
forall a. a -> Cli a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SyncError PullError -> Either (SyncError PullError) CausalHash
forall a b. a -> Either a b
Left (GetCausalHashByPathError -> PullError
PullError'GetCausalHash (GetCausalHashByPathError -> PullError)
-> SyncError GetCausalHashByPathError -> SyncError PullError
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SyncError GetCausalHashByPathError
err))
Right Maybe HashJWT
Nothing -> Either (SyncError PullError) CausalHash
-> Cli (Either (SyncError PullError) CausalHash)
forall a. a -> Cli a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SyncError PullError -> Either (SyncError PullError) CausalHash
forall a b. a -> Either a b
Left (PullError -> SyncError PullError
forall e. e -> SyncError e
SyncError (Path -> PullError
PullError'NoHistoryAtPath Path
repoPath)))
Right (Just HashJWT
hashJwt) ->
BaseUrl
-> RepoInfo
-> HashJWT
-> (Int -> IO ())
-> Cli (Either (SyncError DownloadEntitiesError) ())
downloadEntities BaseUrl
unisonShareUrl (Path -> RepoInfo
Share.pathRepoInfo Path
repoPath) HashJWT
hashJwt Int -> IO ()
downloadedCallback Cli (Either (SyncError DownloadEntitiesError) ())
-> (Either (SyncError DownloadEntitiesError) ()
-> Either (SyncError PullError) CausalHash)
-> Cli (Either (SyncError PullError) CausalHash)
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
Left SyncError DownloadEntitiesError
err -> SyncError PullError -> Either (SyncError PullError) CausalHash
forall a b. a -> Either a b
Left (DownloadEntitiesError -> PullError
PullError'DownloadEntities (DownloadEntitiesError -> PullError)
-> SyncError DownloadEntitiesError -> SyncError PullError
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SyncError DownloadEntitiesError
err)
Right () -> CausalHash -> Either (SyncError PullError) CausalHash
forall a b. b -> Either a b
Right (Hash32 -> CausalHash
hash32ToCausalHash (HashJWT -> Hash32
Share.hashJWTHash HashJWT
hashJwt))
downloadEntities ::
BaseUrl ->
Share.RepoInfo ->
Share.HashJWT ->
(Int -> IO ()) ->
Cli (Either (SyncError Share.DownloadEntitiesError) ())
downloadEntities :: BaseUrl
-> RepoInfo
-> HashJWT
-> (Int -> IO ())
-> Cli (Either (SyncError DownloadEntitiesError) ())
downloadEntities BaseUrl
unisonShareUrl RepoInfo
repoInfo HashJWT
hashJwt Int -> IO ()
downloadedCallback = do
Cli.Env {AuthenticatedHttpClient
authHTTPClient :: AuthenticatedHttpClient
$sel:authHTTPClient:Env :: Env -> AuthenticatedHttpClient
authHTTPClient, Codebase IO Symbol Ann
codebase :: Codebase IO Symbol Ann
$sel:codebase:Env :: Env -> Codebase IO Symbol Ann
codebase} <- Cli Env
forall r (m :: * -> *). MonadReader r m => m r
ask
((forall void.
Either (SyncError DownloadEntitiesError) () -> Cli void)
-> Cli (Either (SyncError DownloadEntitiesError) ()))
-> Cli (Either (SyncError DownloadEntitiesError) ())
forall a. ((forall void. a -> Cli void) -> Cli a) -> Cli a
Cli.label \forall void.
Either (SyncError DownloadEntitiesError) () -> Cli void
done -> do
let failed :: SyncError Share.DownloadEntitiesError -> Cli void
failed :: forall void. SyncError DownloadEntitiesError -> Cli void
failed = Either (SyncError DownloadEntitiesError) () -> Cli void
forall void.
Either (SyncError DownloadEntitiesError) () -> Cli void
done (Either (SyncError DownloadEntitiesError) () -> Cli void)
-> (SyncError DownloadEntitiesError
-> Either (SyncError DownloadEntitiesError) ())
-> SyncError DownloadEntitiesError
-> Cli void
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError DownloadEntitiesError
-> Either (SyncError DownloadEntitiesError) ()
forall a b. a -> Either a b
Left
let hash :: Hash32
hash = HashJWT -> Hash32
Share.hashJWTHash HashJWT
hashJwt
Maybe (NESet Hash32)
maybeTempEntities <-
Transaction (Maybe EntityLocation) -> Cli (Maybe EntityLocation)
forall a. Transaction a -> Cli a
Cli.runTransaction (Hash32 -> Transaction (Maybe EntityLocation)
Q.entityLocation Hash32
hash) Cli (Maybe EntityLocation)
-> (Maybe EntityLocation -> Cli (Maybe (NESet Hash32)))
-> Cli (Maybe (NESet Hash32))
forall a b. Cli a -> (a -> Cli b) -> Cli b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just EntityLocation
Q.EntityInMainStorage -> Maybe (NESet Hash32) -> Cli (Maybe (NESet Hash32))
forall a. a -> Cli a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (NESet Hash32)
forall a. Maybe a
Nothing
Just EntityLocation
Q.EntityInTempStorage -> Maybe (NESet Hash32) -> Cli (Maybe (NESet Hash32))
forall a. a -> Cli a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (NESet Hash32 -> Maybe (NESet Hash32)
forall a. a -> Maybe a
Just (Hash32 -> NESet Hash32
forall a. a -> NESet a
NESet.singleton Hash32
hash))
Maybe EntityLocation
Nothing -> do
let request :: IO (Either CodeserverTransportError DownloadEntitiesResponse)
request =
AuthenticatedHttpClient
-> BaseUrl
-> DownloadEntitiesRequest
-> IO (Either CodeserverTransportError DownloadEntitiesResponse)
httpDownloadEntities
AuthenticatedHttpClient
authHTTPClient
BaseUrl
unisonShareUrl
Share.DownloadEntitiesRequest {RepoInfo
repoInfo :: RepoInfo
$sel:repoInfo:DownloadEntitiesRequest :: RepoInfo
repoInfo, $sel:hashes:DownloadEntitiesRequest :: NESet HashJWT
hashes = HashJWT -> NESet HashJWT
forall a. a -> NESet a
NESet.singleton HashJWT
hashJwt}
NEMap Hash32 (Entity Text Hash32 HashJWT)
entities <-
IO (Either CodeserverTransportError DownloadEntitiesResponse)
-> Cli (Either CodeserverTransportError DownloadEntitiesResponse)
forall a. IO a -> Cli a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Either CodeserverTransportError DownloadEntitiesResponse)
request Cli (Either CodeserverTransportError DownloadEntitiesResponse)
-> (Either CodeserverTransportError DownloadEntitiesResponse
-> Cli (NEMap Hash32 (Entity Text Hash32 HashJWT)))
-> Cli (NEMap Hash32 (Entity Text Hash32 HashJWT))
forall a b. Cli a -> (a -> Cli b) -> Cli b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left CodeserverTransportError
err -> SyncError DownloadEntitiesError
-> Cli (NEMap Hash32 (Entity Text Hash32 HashJWT))
forall void. SyncError DownloadEntitiesError -> Cli void
failed (CodeserverTransportError -> SyncError DownloadEntitiesError
forall e. CodeserverTransportError -> SyncError e
TransportError CodeserverTransportError
err)
Right (Share.DownloadEntitiesFailure DownloadEntitiesError
err) -> SyncError DownloadEntitiesError
-> Cli (NEMap Hash32 (Entity Text Hash32 HashJWT))
forall void. SyncError DownloadEntitiesError -> Cli void
failed (DownloadEntitiesError -> SyncError DownloadEntitiesError
forall e. e -> SyncError e
SyncError DownloadEntitiesError
err)
Right (Share.DownloadEntitiesSuccess NEMap Hash32 (Entity Text Hash32 HashJWT)
entities) -> NEMap Hash32 (Entity Text Hash32 HashJWT)
-> Cli (NEMap Hash32 (Entity Text Hash32 HashJWT))
forall a. a -> Cli a
forall (f :: * -> *) a. Applicative f => a -> f a
pure NEMap Hash32 (Entity Text Hash32 HashJWT)
entities
case NEMap Hash32 (Entity Text Hash32 HashJWT)
-> Either EntityValidationError ()
validateEntities NEMap Hash32 (Entity Text Hash32 HashJWT)
entities of
Left EntityValidationError
err -> SyncError DownloadEntitiesError -> Cli ()
forall void. SyncError DownloadEntitiesError -> Cli void
failed (SyncError DownloadEntitiesError -> Cli ())
-> (EntityValidationError -> SyncError DownloadEntitiesError)
-> EntityValidationError
-> Cli ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DownloadEntitiesError -> SyncError DownloadEntitiesError
forall e. e -> SyncError e
SyncError (DownloadEntitiesError -> SyncError DownloadEntitiesError)
-> (EntityValidationError -> DownloadEntitiesError)
-> EntityValidationError
-> SyncError DownloadEntitiesError
forall b c a. (b -> c) -> (a -> b) -> a -> c
. EntityValidationError -> DownloadEntitiesError
Share.DownloadEntitiesEntityValidationFailure (EntityValidationError -> Cli ())
-> EntityValidationError -> Cli ()
forall a b. (a -> b) -> a -> b
$ EntityValidationError
err
Right () -> () -> Cli ()
forall a. a -> Cli a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Set Hash32
tempEntities <- Transaction (Set Hash32) -> Cli (Set Hash32)
forall a. Transaction a -> Cli a
Cli.runTransaction (NEMap Hash32 (Entity Text Hash32 HashJWT)
-> Transaction (Set Hash32)
insertEntities NEMap Hash32 (Entity Text Hash32 HashJWT)
entities)
IO () -> Cli ()
forall a. IO a -> Cli a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Int -> IO ()
downloadedCallback Int
1)
pure (Set Hash32 -> Maybe (NESet Hash32)
forall a. Set a -> Maybe (NESet a)
NESet.nonEmptySet Set Hash32
tempEntities)
Maybe (NESet Hash32) -> (NESet Hash32 -> Cli ()) -> Cli ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
whenJust Maybe (NESet Hash32)
maybeTempEntities \NESet Hash32
tempEntities -> do
let doCompleteTempEntities :: IO (Either (SyncError DownloadEntitiesError) ())
doCompleteTempEntities =
AuthenticatedHttpClient
-> BaseUrl
-> (forall a. ((forall x. Transaction x -> IO x) -> IO a) -> IO a)
-> RepoInfo
-> (Int -> IO ())
-> NESet Hash32
-> IO (Either (SyncError DownloadEntitiesError) ())
completeTempEntities
AuthenticatedHttpClient
authHTTPClient
BaseUrl
unisonShareUrl
( \(forall x. Transaction x -> IO x) -> IO a
action ->
Codebase IO Symbol Ann -> forall x. (Connection -> IO x) -> IO x
forall (m :: * -> *) v a.
Codebase m v a -> forall x. (Connection -> m x) -> m x
Codebase.withConnection Codebase IO Symbol Ann
codebase \Connection
conn ->
(forall x. Transaction x -> IO x) -> IO a
action (Connection -> Transaction x -> IO x
forall (m :: * -> *) a.
(MonadIO m, HasCallStack) =>
Connection -> Transaction a -> m a
Sqlite.runTransaction Connection
conn)
)
RepoInfo
repoInfo
Int -> IO ()
downloadedCallback
NESet Hash32
tempEntities
IO (Either (SyncError DownloadEntitiesError) ())
-> Cli (Either (SyncError DownloadEntitiesError) ())
forall a. IO a -> Cli a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Either (SyncError DownloadEntitiesError) ())
doCompleteTempEntities Cli (Either (SyncError DownloadEntitiesError) ())
-> (Cli (Either (SyncError DownloadEntitiesError) ()) -> Cli ())
-> Cli ()
forall a b. a -> (a -> b) -> b
& (SyncError DownloadEntitiesError -> Cli ())
-> Cli (Either (SyncError DownloadEntitiesError) ()) -> Cli ()
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> m (Either a b) -> m b
onLeftM \SyncError DownloadEntitiesError
err ->
SyncError DownloadEntitiesError -> Cli ()
forall void. SyncError DownloadEntitiesError -> Cli void
failed SyncError DownloadEntitiesError
err
Bool
_success <- IO Bool -> Cli Bool
forall a. IO a -> Cli a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Codebase IO Symbol Ann -> forall x. (Connection -> IO x) -> IO x
forall (m :: * -> *) v a.
Codebase m v a -> forall x. (Connection -> m x) -> m x
Codebase.withConnection Codebase IO Symbol Ann
codebase Connection -> IO Bool
Sqlite.vacuum)
pure (() -> Either (SyncError DownloadEntitiesError) ()
forall a b. b -> Either a b
Right ())
validateEntities :: NEMap Hash32 (Share.Entity Text Hash32 Share.HashJWT) -> Either Share.EntityValidationError ()
validateEntities :: NEMap Hash32 (Entity Text Hash32 HashJWT)
-> Either EntityValidationError ()
validateEntities NEMap Hash32 (Entity Text Hash32 HashJWT)
entities =
Bool
-> Either EntityValidationError ()
-> Either EntityValidationError ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
shouldValidateEntities (Either EntityValidationError ()
-> Either EntityValidationError ())
-> Either EntityValidationError ()
-> Either EntityValidationError ()
forall a b. (a -> b) -> a -> b
$ do
Map Hash32 (Entity Text Hash32 HashJWT)
-> (Hash32
-> Entity Text Hash32 HashJWT -> Either EntityValidationError ())
-> Either EntityValidationError ()
forall i (t :: * -> *) (f :: * -> *) a b.
(FoldableWithIndex i t, Applicative f) =>
t a -> (i -> a -> f b) -> f ()
ifor_ (NEMap Hash32 (Entity Text Hash32 HashJWT)
-> Map Hash32 (Entity Text Hash32 HashJWT)
forall k a. NEMap k a -> Map k a
NEMap.toMap NEMap Hash32 (Entity Text Hash32 HashJWT)
entities) \Hash32
hash Entity Text Hash32 HashJWT
entity -> do
let entityWithHashes :: Entity Text Hash32 Hash32
entityWithHashes = Entity Text Hash32 HashJWT
entity Entity Text Hash32 HashJWT
-> (Entity Text Hash32 HashJWT -> Entity Text Hash32 Hash32)
-> Entity Text Hash32 Hash32
forall a b. a -> (a -> b) -> b
& (HashJWT -> Identity Hash32)
-> Entity Text Hash32 HashJWT
-> Identity (Entity Text Hash32 Hash32)
forall (m :: * -> *) hash' hash text noSyncHash.
(Applicative m, Ord hash') =>
(hash -> m hash')
-> Entity text noSyncHash hash -> m (Entity text noSyncHash hash')
Share.entityHashes_ ((HashJWT -> Identity Hash32)
-> Entity Text Hash32 HashJWT
-> Identity (Entity Text Hash32 Hash32))
-> (HashJWT -> Hash32)
-> Entity Text Hash32 HashJWT
-> Entity Text Hash32 Hash32
forall s t a b. ASetter s t a b -> (a -> b) -> s -> t
%~ HashJWT -> Hash32
Share.hashJWTHash
case Hash32 -> Entity Text Hash32 Hash32 -> Maybe EntityValidationError
EV.validateEntity Hash32
hash Entity Text Hash32 Hash32
entityWithHashes of
Maybe EntityValidationError
Nothing -> () -> Either EntityValidationError ()
forall a. a -> Either EntityValidationError a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just err :: EntityValidationError
err@(Share.EntityHashMismatch EntityType
et (Share.HashMismatchForEntity {Hash32
supplied :: Hash32
$sel:supplied:HashMismatchForEntity :: HashMismatchForEntity -> Hash32
supplied, Hash32
computed :: Hash32
$sel:computed:HashMismatchForEntity :: HashMismatchForEntity -> Hash32
computed})) ->
let expectedMismatches :: Map Hash32 Hash32
expectedMismatches = case EntityType
et of
EntityType
Share.TermComponentType -> Map Hash32 Hash32
expectedComponentHashMismatches
EntityType
Share.DeclComponentType -> Map Hash32 Hash32
expectedComponentHashMismatches
EntityType
Share.CausalType -> Map Hash32 Hash32
expectedCausalHashMismatches
EntityType
_ -> Map Hash32 Hash32
forall a. Monoid a => a
mempty
in case Hash32 -> Map Hash32 Hash32 -> Maybe Hash32
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Hash32
supplied Map Hash32 Hash32
expectedMismatches of
Just Hash32
expected
| Hash32
expected Hash32 -> Hash32 -> Bool
forall a. Eq a => a -> a -> Bool
== Hash32
computed -> () -> Either EntityValidationError ()
forall a. a -> Either EntityValidationError a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Maybe Hash32
_ -> do
EntityValidationError -> Either EntityValidationError ()
forall a b. a -> Either a b
Left EntityValidationError
err
Just EntityValidationError
err -> do
EntityValidationError -> Either EntityValidationError ()
forall a b. a -> Either a b
Left EntityValidationError
err
validationEnvKey :: String
validationEnvKey :: String
validationEnvKey = String
"UNISON_ENTITY_VALIDATION"
shouldValidateEntities :: Bool
shouldValidateEntities :: Bool
shouldValidateEntities = IO Bool -> Bool
forall a. IO a -> a
unsafePerformIO (IO Bool -> Bool) -> IO Bool -> Bool
forall a b. (a -> b) -> a -> b
$ do
String -> IO (Maybe String)
lookupEnv String
validationEnvKey IO (Maybe String) -> (Maybe String -> Bool) -> IO Bool
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
Just String
"false" -> Bool
False
Maybe String
_ -> Bool
True
{-# NOINLINE shouldValidateEntities #-}
type WorkerCount =
TVar Int
newWorkerCount :: IO WorkerCount
newWorkerCount :: IO WorkerCount
newWorkerCount =
Int -> IO WorkerCount
forall a. a -> IO (TVar a)
newTVarIO Int
0
recordWorking :: WorkerCount -> STM ()
recordWorking :: WorkerCount -> STM ()
recordWorking WorkerCount
sem =
WorkerCount -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' WorkerCount
sem (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
recordNotWorking :: WorkerCount -> STM ()
recordNotWorking :: WorkerCount -> STM ()
recordNotWorking WorkerCount
sem =
WorkerCount -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' WorkerCount
sem \Int
n -> Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1
data DispatcherJob
= DispatcherForkWorker (NESet Share.HashJWT)
| DispatcherReturnEarlyBecauseDownloaderFailed (SyncError Share.DownloadEntitiesError)
| DispatcherDone
completeTempEntities ::
AuthenticatedHttpClient ->
BaseUrl ->
(forall a. ((forall x. Sqlite.Transaction x -> IO x) -> IO a) -> IO a) ->
Share.RepoInfo ->
(Int -> IO ()) ->
NESet Hash32 ->
IO (Either (SyncError Share.DownloadEntitiesError) ())
completeTempEntities :: AuthenticatedHttpClient
-> BaseUrl
-> (forall a. ((forall x. Transaction x -> IO x) -> IO a) -> IO a)
-> RepoInfo
-> (Int -> IO ())
-> NESet Hash32
-> IO (Either (SyncError DownloadEntitiesError) ())
completeTempEntities AuthenticatedHttpClient
httpClient BaseUrl
unisonShareUrl forall a. ((forall x. Transaction x -> IO x) -> IO a) -> IO a
connect RepoInfo
repoInfo Int -> IO ()
downloadedCallback NESet Hash32
initialNewTempEntities = do
TVar (Set HashJWT)
hashesVar <- Set HashJWT -> IO (TVar (Set HashJWT))
forall a. a -> IO (TVar a)
newTVarIO Set HashJWT
forall a. Set a
Set.empty
TVar (Set HashJWT)
uninsertedHashesVar <- Set HashJWT -> IO (TVar (Set HashJWT))
forall a. a -> IO (TVar a)
newTVarIO Set HashJWT
forall a. Set a
Set.empty
TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
entitiesQueue <- IO
(TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT)))
forall a. IO (TQueue a)
newTQueueIO
TQueue (Set HashJWT, Maybe (NESet Hash32))
newTempEntitiesQueue <- IO (TQueue (Set HashJWT, Maybe (NESet Hash32)))
forall a. IO (TQueue a)
newTQueueIO
WorkerCount
workerCount <- IO WorkerCount
newWorkerCount
TMVar (SyncError DownloadEntitiesError)
downloaderFailedVar <- IO (TMVar (SyncError DownloadEntitiesError))
forall a. IO (TMVar a)
newEmptyTMVarIO
STM () -> IO ()
forall a. STM a -> IO a
atomically (TQueue (Set HashJWT, Maybe (NESet Hash32))
-> (Set HashJWT, Maybe (NESet Hash32)) -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Set HashJWT, Maybe (NESet Hash32))
newTempEntitiesQueue (Set HashJWT
forall a. Set a
Set.empty, NESet Hash32 -> Maybe (NESet Hash32)
forall a. a -> Maybe a
Just NESet Hash32
initialNewTempEntities))
(Scope -> IO (Either (SyncError DownloadEntitiesError) ()))
-> IO (Either (SyncError DownloadEntitiesError) ())
forall a. (Scope -> IO a) -> IO a
Ki.scoped \Scope
scope -> do
Scope -> IO Void -> IO ()
Ki.fork_ Scope
scope (TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
-> TQueue (Set HashJWT, Maybe (NESet Hash32))
-> WorkerCount
-> IO Void
inserter TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
entitiesQueue TQueue (Set HashJWT, Maybe (NESet Hash32))
newTempEntitiesQueue WorkerCount
workerCount)
Scope -> IO Void -> IO ()
Ki.fork_ Scope
scope (TVar (Set HashJWT)
-> TVar (Set HashJWT)
-> TQueue (Set HashJWT, Maybe (NESet Hash32))
-> WorkerCount
-> IO Void
elaborator TVar (Set HashJWT)
hashesVar TVar (Set HashJWT)
uninsertedHashesVar TQueue (Set HashJWT, Maybe (NESet Hash32))
newTempEntitiesQueue WorkerCount
workerCount)
TVar (Set HashJWT)
-> TVar (Set HashJWT)
-> TQueue
(NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
-> TQueue (Set HashJWT, Maybe (NESet Hash32))
-> WorkerCount
-> TMVar (SyncError DownloadEntitiesError)
-> IO (Either (SyncError DownloadEntitiesError) ())
dispatcher TVar (Set HashJWT)
hashesVar TVar (Set HashJWT)
uninsertedHashesVar TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
entitiesQueue TQueue (Set HashJWT, Maybe (NESet Hash32))
newTempEntitiesQueue WorkerCount
workerCount TMVar (SyncError DownloadEntitiesError)
downloaderFailedVar
where
dispatcher ::
TVar (Set Share.HashJWT) ->
TVar (Set Share.HashJWT) ->
TQueue (NESet Share.HashJWT, NEMap Hash32 (Share.Entity Text Hash32 Share.HashJWT)) ->
TQueue (Set Share.HashJWT, Maybe (NESet Hash32)) ->
WorkerCount ->
TMVar (SyncError Share.DownloadEntitiesError) ->
IO (Either (SyncError Share.DownloadEntitiesError) ())
dispatcher :: TVar (Set HashJWT)
-> TVar (Set HashJWT)
-> TQueue
(NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
-> TQueue (Set HashJWT, Maybe (NESet Hash32))
-> WorkerCount
-> TMVar (SyncError DownloadEntitiesError)
-> IO (Either (SyncError DownloadEntitiesError) ())
dispatcher TVar (Set HashJWT)
hashesVar TVar (Set HashJWT)
uninsertedHashesVar TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
entitiesQueue TQueue (Set HashJWT, Maybe (NESet Hash32))
newTempEntitiesQueue WorkerCount
workerCount TMVar (SyncError DownloadEntitiesError)
downloaderFailedVar =
(Scope -> IO (Either (SyncError DownloadEntitiesError) ()))
-> IO (Either (SyncError DownloadEntitiesError) ())
forall a. (Scope -> IO a) -> IO a
Ki.scoped \Scope
scope ->
let loop :: IO (Either (SyncError Share.DownloadEntitiesError) ())
loop :: IO (Either (SyncError DownloadEntitiesError) ())
loop =
STM DispatcherJob -> IO DispatcherJob
forall a. STM a -> IO a
atomically (STM DispatcherJob
checkIfDownloaderFailedMode STM DispatcherJob -> STM DispatcherJob -> STM DispatcherJob
forall a. STM a -> STM a -> STM a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> STM DispatcherJob
dispatchWorkMode STM DispatcherJob -> STM DispatcherJob -> STM DispatcherJob
forall a. STM a -> STM a -> STM a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> STM DispatcherJob
checkIfDoneMode) IO DispatcherJob
-> (DispatcherJob
-> IO (Either (SyncError DownloadEntitiesError) ()))
-> IO (Either (SyncError DownloadEntitiesError) ())
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
DispatcherJob
DispatcherDone -> Either (SyncError DownloadEntitiesError) ()
-> IO (Either (SyncError DownloadEntitiesError) ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> Either (SyncError DownloadEntitiesError) ()
forall a b. b -> Either a b
Right ())
DispatcherReturnEarlyBecauseDownloaderFailed SyncError DownloadEntitiesError
err -> Either (SyncError DownloadEntitiesError) ()
-> IO (Either (SyncError DownloadEntitiesError) ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SyncError DownloadEntitiesError
-> Either (SyncError DownloadEntitiesError) ()
forall a b. a -> Either a b
Left SyncError DownloadEntitiesError
err)
DispatcherForkWorker NESet HashJWT
hashes -> do
STM () -> IO ()
forall a. STM a -> IO a
atomically do
Int
workers <- WorkerCount -> STM Int
forall a. TVar a -> STM a
readTVar WorkerCount
workerCount
Bool -> STM ()
check (Int
workers Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
maxSimultaneousPullDownloaders Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
2)
WorkerCount -> STM ()
recordWorking WorkerCount
workerCount
Thread ()
_ <-
forall a. Scope -> IO a -> IO (Thread a)
Ki.fork @() Scope
scope do
TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
-> WorkerCount
-> NESet HashJWT
-> IO (Either (SyncError DownloadEntitiesError) ())
downloader TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
entitiesQueue WorkerCount
workerCount NESet HashJWT
hashes IO (Either (SyncError DownloadEntitiesError) ())
-> (IO (Either (SyncError DownloadEntitiesError) ()) -> IO ())
-> IO ()
forall a b. a -> (a -> b) -> b
& (SyncError DownloadEntitiesError -> IO ())
-> IO (Either (SyncError DownloadEntitiesError) ()) -> IO ()
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> m (Either a b) -> m b
onLeftM \SyncError DownloadEntitiesError
err ->
IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (TMVar (SyncError DownloadEntitiesError)
-> SyncError DownloadEntitiesError -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar (SyncError DownloadEntitiesError)
downloaderFailedVar SyncError DownloadEntitiesError
err))
IO (Either (SyncError DownloadEntitiesError) ())
loop
in IO (Either (SyncError DownloadEntitiesError) ())
loop
where
checkIfDownloaderFailedMode :: STM DispatcherJob
checkIfDownloaderFailedMode :: STM DispatcherJob
checkIfDownloaderFailedMode =
SyncError DownloadEntitiesError -> DispatcherJob
DispatcherReturnEarlyBecauseDownloaderFailed (SyncError DownloadEntitiesError -> DispatcherJob)
-> STM (SyncError DownloadEntitiesError) -> STM DispatcherJob
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMVar (SyncError DownloadEntitiesError)
-> STM (SyncError DownloadEntitiesError)
forall a. TMVar a -> STM a
readTMVar TMVar (SyncError DownloadEntitiesError)
downloaderFailedVar
dispatchWorkMode :: STM DispatcherJob
dispatchWorkMode :: STM DispatcherJob
dispatchWorkMode = do
Set HashJWT
hashes <- TVar (Set HashJWT) -> STM (Set HashJWT)
forall a. TVar a -> STM a
readTVar TVar (Set HashJWT)
hashesVar
Bool -> STM ()
check (Bool -> Bool
not (Set HashJWT -> Bool
forall a. Set a -> Bool
Set.null Set HashJWT
hashes))
let (Set HashJWT
hashes1, Set HashJWT
hashes2) = Int -> Set HashJWT -> (Set HashJWT, Set HashJWT)
forall a. Int -> Set a -> (Set a, Set a)
Set.splitAt Int
syncChunkSize Set HashJWT
hashes
TVar (Set HashJWT) -> (Set HashJWT -> Set HashJWT) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Set HashJWT)
uninsertedHashesVar (Set HashJWT -> Set HashJWT -> Set HashJWT
forall a. Ord a => Set a -> Set a -> Set a
Set.union Set HashJWT
hashes1)
TVar (Set HashJWT) -> Set HashJWT -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Set HashJWT)
hashesVar Set HashJWT
hashes2
pure (NESet HashJWT -> DispatcherJob
DispatcherForkWorker (Set HashJWT -> NESet HashJWT
forall a. Set a -> NESet a
NESet.unsafeFromSet Set HashJWT
hashes1))
checkIfDoneMode :: STM DispatcherJob
checkIfDoneMode :: STM DispatcherJob
checkIfDoneMode = do
Int
workers <- WorkerCount -> STM Int
forall a. TVar a -> STM a
readTVar WorkerCount
workerCount
Bool -> STM ()
check (Int
workers Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0)
TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
-> STM Bool
forall a. TQueue a -> STM Bool
isEmptyTQueue TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
entitiesQueue STM Bool -> (Bool -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> STM ()
check
TQueue (Set HashJWT, Maybe (NESet Hash32)) -> STM Bool
forall a. TQueue a -> STM Bool
isEmptyTQueue TQueue (Set HashJWT, Maybe (NESet Hash32))
newTempEntitiesQueue STM Bool -> (Bool -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> STM ()
check
pure DispatcherJob
DispatcherDone
downloader ::
TQueue (NESet Share.HashJWT, NEMap Hash32 (Share.Entity Text Hash32 Share.HashJWT)) ->
WorkerCount ->
NESet Share.HashJWT ->
IO (Either (SyncError Share.DownloadEntitiesError) ())
downloader :: TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
-> WorkerCount
-> NESet HashJWT
-> IO (Either (SyncError DownloadEntitiesError) ())
downloader TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
entitiesQueue WorkerCount
workerCount NESet HashJWT
hashes = do
AuthenticatedHttpClient
-> BaseUrl
-> DownloadEntitiesRequest
-> IO (Either CodeserverTransportError DownloadEntitiesResponse)
httpDownloadEntities AuthenticatedHttpClient
httpClient BaseUrl
unisonShareUrl Share.DownloadEntitiesRequest {RepoInfo
$sel:repoInfo:DownloadEntitiesRequest :: RepoInfo
repoInfo :: RepoInfo
repoInfo, NESet HashJWT
$sel:hashes:DownloadEntitiesRequest :: NESet HashJWT
hashes :: NESet HashJWT
hashes} IO (Either CodeserverTransportError DownloadEntitiesResponse)
-> (Either CodeserverTransportError DownloadEntitiesResponse
-> IO (Either (SyncError DownloadEntitiesError) ()))
-> IO (Either (SyncError DownloadEntitiesError) ())
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left CodeserverTransportError
err -> do
STM () -> IO ()
forall a. STM a -> IO a
atomically (WorkerCount -> STM ()
recordNotWorking WorkerCount
workerCount)
pure (SyncError DownloadEntitiesError
-> Either (SyncError DownloadEntitiesError) ()
forall a b. a -> Either a b
Left (CodeserverTransportError -> SyncError DownloadEntitiesError
forall e. CodeserverTransportError -> SyncError e
TransportError CodeserverTransportError
err))
Right (Share.DownloadEntitiesFailure DownloadEntitiesError
err) -> do
STM () -> IO ()
forall a. STM a -> IO a
atomically (WorkerCount -> STM ()
recordNotWorking WorkerCount
workerCount)
pure (SyncError DownloadEntitiesError
-> Either (SyncError DownloadEntitiesError) ()
forall a b. a -> Either a b
Left (DownloadEntitiesError -> SyncError DownloadEntitiesError
forall e. e -> SyncError e
SyncError DownloadEntitiesError
err))
Right (Share.DownloadEntitiesSuccess NEMap Hash32 (Entity Text Hash32 HashJWT)
entities) -> do
Int -> IO ()
downloadedCallback (NESet HashJWT -> Int
forall a. NESet a -> Int
NESet.size NESet HashJWT
hashes)
case NEMap Hash32 (Entity Text Hash32 HashJWT)
-> Either EntityValidationError ()
validateEntities NEMap Hash32 (Entity Text Hash32 HashJWT)
entities of
Left EntityValidationError
err -> Either (SyncError DownloadEntitiesError) ()
-> IO (Either (SyncError DownloadEntitiesError) ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either (SyncError DownloadEntitiesError) ()
-> IO (Either (SyncError DownloadEntitiesError) ()))
-> (EntityValidationError
-> Either (SyncError DownloadEntitiesError) ())
-> EntityValidationError
-> IO (Either (SyncError DownloadEntitiesError) ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError DownloadEntitiesError
-> Either (SyncError DownloadEntitiesError) ()
forall a b. a -> Either a b
Left (SyncError DownloadEntitiesError
-> Either (SyncError DownloadEntitiesError) ())
-> (EntityValidationError -> SyncError DownloadEntitiesError)
-> EntityValidationError
-> Either (SyncError DownloadEntitiesError) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DownloadEntitiesError -> SyncError DownloadEntitiesError
forall e. e -> SyncError e
SyncError (DownloadEntitiesError -> SyncError DownloadEntitiesError)
-> (EntityValidationError -> DownloadEntitiesError)
-> EntityValidationError
-> SyncError DownloadEntitiesError
forall b c a. (b -> c) -> (a -> b) -> a -> c
. EntityValidationError -> DownloadEntitiesError
Share.DownloadEntitiesEntityValidationFailure (EntityValidationError
-> IO (Either (SyncError DownloadEntitiesError) ()))
-> EntityValidationError
-> IO (Either (SyncError DownloadEntitiesError) ())
forall a b. (a -> b) -> a -> b
$ EntityValidationError
err
Right () -> do
STM () -> IO ()
forall a. STM a -> IO a
atomically do
TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
-> (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
-> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
entitiesQueue (NESet HashJWT
hashes, NEMap Hash32 (Entity Text Hash32 HashJWT)
entities)
WorkerCount -> STM ()
recordNotWorking WorkerCount
workerCount
pure (() -> Either (SyncError DownloadEntitiesError) ()
forall a b. b -> Either a b
Right ())
inserter ::
TQueue (NESet Share.HashJWT, NEMap Hash32 (Share.Entity Text Hash32 Share.HashJWT)) ->
TQueue (Set Share.HashJWT, Maybe (NESet Hash32)) ->
WorkerCount ->
IO Void
inserter :: TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
-> TQueue (Set HashJWT, Maybe (NESet Hash32))
-> WorkerCount
-> IO Void
inserter TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
entitiesQueue TQueue (Set HashJWT, Maybe (NESet Hash32))
newTempEntitiesQueue WorkerCount
workerCount =
((forall x. Transaction x -> IO x) -> IO Void) -> IO Void
forall a. ((forall x. Transaction x -> IO x) -> IO a) -> IO a
connect \forall x. Transaction x -> IO x
runTransaction ->
IO () -> IO Void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever do
(NESet HashJWT
hashJwts, NEMap Hash32 (Entity Text Hash32 HashJWT)
entities) <-
STM (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
-> IO (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
forall a. STM a -> IO a
atomically do
(NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
entities <- TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
-> STM (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
forall a. TQueue a -> STM a
readTQueue TQueue (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
entitiesQueue
WorkerCount -> STM ()
recordWorking WorkerCount
workerCount
pure (NESet HashJWT, NEMap Hash32 (Entity Text Hash32 HashJWT))
entities
Set Hash32
newTempEntities0 <-
Transaction (Set Hash32) -> IO (Set Hash32)
forall x. Transaction x -> IO x
runTransaction do
NEMap Hash32 (Entity Text Hash32 HashJWT)
-> NonEmpty (Hash32, Entity Text Hash32 HashJWT)
forall k a. NEMap k a -> NonEmpty (k, a)
NEMap.toList NEMap Hash32 (Entity Text Hash32 HashJWT)
entities NonEmpty (Hash32, Entity Text Hash32 HashJWT)
-> (NonEmpty (Hash32, Entity Text Hash32 HashJWT)
-> Transaction (Set Hash32))
-> Transaction (Set Hash32)
forall a b. a -> (a -> b) -> b
& ((Hash32, Entity Text Hash32 HashJWT) -> Transaction (Set Hash32))
-> NonEmpty (Hash32, Entity Text Hash32 HashJWT)
-> Transaction (Set Hash32)
forall (m :: * -> *) (f :: * -> *) b a.
(Monad m, Foldable f, Monoid b) =>
(a -> m b) -> f a -> m b
foldMapM \(Hash32
hash, Entity Text Hash32 HashJWT
entity) ->
Hash32 -> Entity Text Hash32 HashJWT -> Transaction EntityLocation
upsertEntitySomewhere Hash32
hash Entity Text Hash32 HashJWT
entity Transaction EntityLocation
-> (EntityLocation -> Set Hash32) -> Transaction (Set Hash32)
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
EntityLocation
Q.EntityInMainStorage -> Set Hash32
forall a. Set a
Set.empty
EntityLocation
Q.EntityInTempStorage -> Hash32 -> Set Hash32
forall a. a -> Set a
Set.singleton Hash32
hash
STM () -> IO ()
forall a. STM a -> IO a
atomically do
TQueue (Set HashJWT, Maybe (NESet Hash32))
-> (Set HashJWT, Maybe (NESet Hash32)) -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Set HashJWT, Maybe (NESet Hash32))
newTempEntitiesQueue (NESet HashJWT -> Set HashJWT
forall a. NESet a -> Set a
NESet.toSet NESet HashJWT
hashJwts, Set Hash32 -> Maybe (NESet Hash32)
forall a. Set a -> Maybe (NESet a)
NESet.nonEmptySet Set Hash32
newTempEntities0)
WorkerCount -> STM ()
recordNotWorking WorkerCount
workerCount
elaborator ::
TVar (Set Share.HashJWT) ->
TVar (Set Share.HashJWT) ->
TQueue (Set Share.HashJWT, Maybe (NESet Hash32)) ->
WorkerCount ->
IO Void
elaborator :: TVar (Set HashJWT)
-> TVar (Set HashJWT)
-> TQueue (Set HashJWT, Maybe (NESet Hash32))
-> WorkerCount
-> IO Void
elaborator TVar (Set HashJWT)
hashesVar TVar (Set HashJWT)
uninsertedHashesVar TQueue (Set HashJWT, Maybe (NESet Hash32))
newTempEntitiesQueue WorkerCount
workerCount =
((forall x. Transaction x -> IO x) -> IO Void) -> IO Void
forall a. ((forall x. Transaction x -> IO x) -> IO a) -> IO a
connect \forall x. Transaction x -> IO x
runTransaction ->
IO () -> IO Void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever do
Maybe (NESet Hash32)
maybeNewTempEntities <-
STM (Maybe (NESet Hash32)) -> IO (Maybe (NESet Hash32))
forall a. STM a -> IO a
atomically do
(Set HashJWT
hashJwts, Maybe (NESet Hash32)
mayNewTempEntities) <- TQueue (Set HashJWT, Maybe (NESet Hash32))
-> STM (Set HashJWT, Maybe (NESet Hash32))
forall a. TQueue a -> STM a
readTQueue TQueue (Set HashJWT, Maybe (NESet Hash32))
newTempEntitiesQueue
TVar (Set HashJWT) -> (Set HashJWT -> Set HashJWT) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Set HashJWT)
uninsertedHashesVar \Set HashJWT
uninsertedHashes -> Set HashJWT -> Set HashJWT -> Set HashJWT
forall a. Ord a => Set a -> Set a -> Set a
Set.difference Set HashJWT
uninsertedHashes Set HashJWT
hashJwts
case Maybe (NESet Hash32)
mayNewTempEntities of
Maybe (NESet Hash32)
Nothing -> Maybe (NESet Hash32) -> STM (Maybe (NESet Hash32))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (NESet Hash32)
forall a. Maybe a
Nothing
Just NESet Hash32
newTempEntities -> do
WorkerCount -> STM ()
recordWorking WorkerCount
workerCount
pure (NESet Hash32 -> Maybe (NESet Hash32)
forall a. a -> Maybe a
Just NESet Hash32
newTempEntities)
Maybe (NESet Hash32) -> (NESet Hash32 -> IO ()) -> IO ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
whenJust Maybe (NESet Hash32)
maybeNewTempEntities \NESet Hash32
newTempEntities -> do
Set HashJWT
newElaboratedHashes <- Transaction (Set HashJWT) -> IO (Set HashJWT)
forall x. Transaction x -> IO x
runTransaction (NESet Hash32 -> Transaction (Set HashJWT)
elaborateHashes NESet Hash32
newTempEntities)
STM () -> IO ()
forall a. STM a -> IO a
atomically do
Set HashJWT
uninsertedHashes <- TVar (Set HashJWT) -> STM (Set HashJWT)
forall a. TVar a -> STM a
readTVar TVar (Set HashJWT)
uninsertedHashesVar
Set HashJWT
hashes0 <- TVar (Set HashJWT) -> STM (Set HashJWT)
forall a. TVar a -> STM a
readTVar TVar (Set HashJWT)
hashesVar
TVar (Set HashJWT) -> Set HashJWT -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Set HashJWT)
hashesVar (Set HashJWT -> STM ()) -> Set HashJWT -> STM ()
forall a b. (a -> b) -> a -> b
$! Set HashJWT -> Set HashJWT -> Set HashJWT
forall a. Ord a => Set a -> Set a -> Set a
Set.union (Set HashJWT -> Set HashJWT -> Set HashJWT
forall a. Ord a => Set a -> Set a -> Set a
Set.difference Set HashJWT
newElaboratedHashes Set HashJWT
uninsertedHashes) Set HashJWT
hashes0
WorkerCount -> STM ()
recordNotWorking WorkerCount
workerCount
insertEntities :: NEMap Hash32 (Share.Entity Text Hash32 Share.HashJWT) -> Sqlite.Transaction (Set Hash32)
insertEntities :: NEMap Hash32 (Entity Text Hash32 HashJWT)
-> Transaction (Set Hash32)
insertEntities NEMap Hash32 (Entity Text Hash32 HashJWT)
entities =
NEMap Hash32 (Entity Text Hash32 HashJWT)
-> NonEmpty (Hash32, Entity Text Hash32 HashJWT)
forall k a. NEMap k a -> NonEmpty (k, a)
NEMap.toList NEMap Hash32 (Entity Text Hash32 HashJWT)
entities NonEmpty (Hash32, Entity Text Hash32 HashJWT)
-> (NonEmpty (Hash32, Entity Text Hash32 HashJWT)
-> Transaction (Set Hash32))
-> Transaction (Set Hash32)
forall a b. a -> (a -> b) -> b
& ((Hash32, Entity Text Hash32 HashJWT) -> Transaction (Set Hash32))
-> NonEmpty (Hash32, Entity Text Hash32 HashJWT)
-> Transaction (Set Hash32)
forall (m :: * -> *) (f :: * -> *) b a.
(Monad m, Foldable f, Monoid b) =>
(a -> m b) -> f a -> m b
foldMapM \(Hash32
hash, Entity Text Hash32 HashJWT
entity) ->
Hash32 -> Entity Text Hash32 HashJWT -> Transaction EntityLocation
upsertEntitySomewhere Hash32
hash Entity Text Hash32 HashJWT
entity Transaction EntityLocation
-> (EntityLocation -> Set Hash32) -> Transaction (Set Hash32)
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
EntityLocation
Q.EntityInMainStorage -> Set Hash32
forall a. Set a
Set.empty
EntityLocation
Q.EntityInTempStorage -> Hash32 -> Set Hash32
forall a. a -> Set a
Set.singleton Hash32
hash
getCausalHashByPath ::
BaseUrl ->
Share.Path ->
Cli (Either (SyncError GetCausalHashByPathError) (Maybe Share.HashJWT))
getCausalHashByPath :: BaseUrl
-> Path
-> Cli
(Either (SyncError GetCausalHashByPathError) (Maybe HashJWT))
getCausalHashByPath BaseUrl
unisonShareUrl Path
repoPath = do
Cli.Env {AuthenticatedHttpClient
$sel:authHTTPClient:Env :: Env -> AuthenticatedHttpClient
authHTTPClient :: AuthenticatedHttpClient
authHTTPClient} <- Cli Env
forall r (m :: * -> *). MonadReader r m => m r
ask
IO (Either CodeserverTransportError GetCausalHashByPathResponse)
-> Cli
(Either CodeserverTransportError GetCausalHashByPathResponse)
forall a. IO a -> Cli a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (AuthenticatedHttpClient
-> BaseUrl
-> GetCausalHashByPathRequest
-> IO (Either CodeserverTransportError GetCausalHashByPathResponse)
httpGetCausalHashByPath AuthenticatedHttpClient
authHTTPClient BaseUrl
unisonShareUrl (Path -> GetCausalHashByPathRequest
Share.GetCausalHashByPathRequest Path
repoPath)) Cli (Either CodeserverTransportError GetCausalHashByPathResponse)
-> (Either CodeserverTransportError GetCausalHashByPathResponse
-> Either (SyncError GetCausalHashByPathError) (Maybe HashJWT))
-> Cli
(Either (SyncError GetCausalHashByPathError) (Maybe HashJWT))
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
Left CodeserverTransportError
err -> SyncError GetCausalHashByPathError
-> Either (SyncError GetCausalHashByPathError) (Maybe HashJWT)
forall a b. a -> Either a b
Left (CodeserverTransportError -> SyncError GetCausalHashByPathError
forall e. CodeserverTransportError -> SyncError e
TransportError CodeserverTransportError
err)
Right (Share.GetCausalHashByPathSuccess Maybe HashJWT
maybeHashJwt) -> Maybe HashJWT
-> Either (SyncError GetCausalHashByPathError) (Maybe HashJWT)
forall a b. b -> Either a b
Right Maybe HashJWT
maybeHashJwt
Right (Share.GetCausalHashByPathNoReadPermission Path
_) ->
SyncError GetCausalHashByPathError
-> Either (SyncError GetCausalHashByPathError) (Maybe HashJWT)
forall a b. a -> Either a b
Left (GetCausalHashByPathError -> SyncError GetCausalHashByPathError
forall e. e -> SyncError e
SyncError (Path -> GetCausalHashByPathError
GetCausalHashByPathErrorNoReadPermission Path
repoPath))
Right (Share.GetCausalHashByPathInvalidRepoInfo Text
err RepoInfo
repoInfo) ->
SyncError GetCausalHashByPathError
-> Either (SyncError GetCausalHashByPathError) (Maybe HashJWT)
forall a b. a -> Either a b
Left (GetCausalHashByPathError -> SyncError GetCausalHashByPathError
forall e. e -> SyncError e
SyncError (Text -> RepoInfo -> GetCausalHashByPathError
GetCausalHashByPathErrorInvalidRepoInfo Text
err RepoInfo
repoInfo))
Right GetCausalHashByPathResponse
Share.GetCausalHashByPathUserNotFound ->
SyncError GetCausalHashByPathError
-> Either (SyncError GetCausalHashByPathError) (Maybe HashJWT)
forall a b. a -> Either a b
Left (GetCausalHashByPathError -> SyncError GetCausalHashByPathError
forall e. e -> SyncError e
SyncError (GetCausalHashByPathError -> SyncError GetCausalHashByPathError)
-> GetCausalHashByPathError -> SyncError GetCausalHashByPathError
forall a b. (a -> b) -> a -> b
$ RepoInfo -> GetCausalHashByPathError
GetCausalHashByPathErrorUserNotFound (Path -> RepoInfo
Share.pathRepoInfo Path
repoPath))
data UploadDispatcherJob
= UploadDispatcherReturnFailure (SyncError Share.UploadEntitiesError)
| UploadDispatcherForkWorkerWhenAvailable (NESet Hash32)
| UploadDispatcherForkWorker (NESet Hash32)
| UploadDispatcherDone
uploadEntities ::
BaseUrl ->
Share.RepoInfo ->
NESet Hash32 ->
(Int -> IO ()) ->
Cli (Either (SyncError Share.UploadEntitiesError) ())
uploadEntities :: BaseUrl
-> RepoInfo
-> NESet Hash32
-> (Int -> IO ())
-> Cli (Either (SyncError UploadEntitiesError) ())
uploadEntities BaseUrl
unisonShareUrl RepoInfo
repoInfo NESet Hash32
hashes0 Int -> IO ()
uploadedCallback = do
Cli.Env {AuthenticatedHttpClient
$sel:authHTTPClient:Env :: Env -> AuthenticatedHttpClient
authHTTPClient :: AuthenticatedHttpClient
authHTTPClient, Codebase IO Symbol Ann
$sel:codebase:Env :: Env -> Codebase IO Symbol Ann
codebase :: Codebase IO Symbol Ann
codebase} <- Cli Env
forall r (m :: * -> *). MonadReader r m => m r
ask
IO (Either (SyncError UploadEntitiesError) ())
-> Cli (Either (SyncError UploadEntitiesError) ())
forall a. IO a -> Cli a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO do
TVar (Set Hash32)
hashesVar <- Set Hash32 -> IO (TVar (Set Hash32))
forall a. a -> IO (TVar a)
newTVarIO (NESet Hash32 -> Set Hash32
forall a. NESet a -> Set a
NESet.toSet NESet Hash32
hashes0)
TVar (Set Hash32)
dedupeVar <- Set Hash32 -> IO (TVar (Set Hash32))
forall a. a -> IO (TVar a)
newTVarIO Set Hash32
forall a. Set a
Set.empty
WorkerCount
nextWorkerIdVar <- Int -> IO WorkerCount
forall a. a -> IO (TVar a)
newTVarIO Int
0
TVar (Set Int)
workersVar <- Set Int -> IO (TVar (Set Int))
forall a. a -> IO (TVar a)
newTVarIO Set Int
forall a. Set a
Set.empty
TMVar (SyncError UploadEntitiesError)
workerFailedVar <- IO (TMVar (SyncError UploadEntitiesError))
forall a. IO (TMVar a)
newEmptyTMVarIO
(Scope -> IO (Either (SyncError UploadEntitiesError) ()))
-> IO (Either (SyncError UploadEntitiesError) ())
forall a. (Scope -> IO a) -> IO a
Ki.scoped \Scope
scope ->
Scope
-> AuthenticatedHttpClient
-> (forall x. Transaction x -> IO x)
-> TVar (Set Hash32)
-> TVar (Set Hash32)
-> WorkerCount
-> TVar (Set Int)
-> TMVar (SyncError UploadEntitiesError)
-> IO (Either (SyncError UploadEntitiesError) ())
dispatcher
Scope
scope
AuthenticatedHttpClient
authHTTPClient
(Codebase IO Symbol Ann -> Transaction a -> IO a
forall (m :: * -> *) v a b.
MonadIO m =>
Codebase m v a -> Transaction b -> m b
Codebase.runTransaction Codebase IO Symbol Ann
codebase)
TVar (Set Hash32)
hashesVar
TVar (Set Hash32)
dedupeVar
WorkerCount
nextWorkerIdVar
TVar (Set Int)
workersVar
TMVar (SyncError UploadEntitiesError)
workerFailedVar
where
dispatcher ::
Ki.Scope ->
AuthenticatedHttpClient ->
(forall a. Sqlite.Transaction a -> IO a) ->
TVar (Set Hash32) ->
TVar (Set Hash32) ->
TVar Int ->
TVar (Set Int) ->
TMVar (SyncError Share.UploadEntitiesError) ->
IO (Either (SyncError Share.UploadEntitiesError) ())
dispatcher :: Scope
-> AuthenticatedHttpClient
-> (forall x. Transaction x -> IO x)
-> TVar (Set Hash32)
-> TVar (Set Hash32)
-> WorkerCount
-> TVar (Set Int)
-> TMVar (SyncError UploadEntitiesError)
-> IO (Either (SyncError UploadEntitiesError) ())
dispatcher Scope
scope AuthenticatedHttpClient
httpClient forall x. Transaction x -> IO x
runTransaction TVar (Set Hash32)
hashesVar TVar (Set Hash32)
dedupeVar WorkerCount
nextWorkerIdVar TVar (Set Int)
workersVar TMVar (SyncError UploadEntitiesError)
workerFailedVar = do
IO (Either (SyncError UploadEntitiesError) ())
loop
where
loop :: IO (Either (SyncError Share.UploadEntitiesError) ())
loop :: IO (Either (SyncError UploadEntitiesError) ())
loop =
[STM UploadDispatcherJob]
-> IO (Either (SyncError UploadEntitiesError) ())
doJob [STM UploadDispatcherJob
checkForFailureMode, STM UploadDispatcherJob
dispatchWorkMode, STM UploadDispatcherJob
checkIfDoneMode]
doJob :: [STM UploadDispatcherJob] -> IO (Either (SyncError Share.UploadEntitiesError) ())
doJob :: [STM UploadDispatcherJob]
-> IO (Either (SyncError UploadEntitiesError) ())
doJob [STM UploadDispatcherJob]
jobs =
STM UploadDispatcherJob -> IO UploadDispatcherJob
forall a. STM a -> IO a
atomically ([STM UploadDispatcherJob] -> STM UploadDispatcherJob
forall (t :: * -> *) (f :: * -> *) a.
(Foldable t, Alternative f) =>
t (f a) -> f a
asum [STM UploadDispatcherJob]
jobs) IO UploadDispatcherJob
-> (UploadDispatcherJob
-> IO (Either (SyncError UploadEntitiesError) ()))
-> IO (Either (SyncError UploadEntitiesError) ())
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
UploadDispatcherReturnFailure SyncError UploadEntitiesError
err -> Either (SyncError UploadEntitiesError) ()
-> IO (Either (SyncError UploadEntitiesError) ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SyncError UploadEntitiesError
-> Either (SyncError UploadEntitiesError) ()
forall a b. a -> Either a b
Left SyncError UploadEntitiesError
err)
UploadDispatcherForkWorkerWhenAvailable NESet Hash32
hashes -> [STM UploadDispatcherJob]
-> IO (Either (SyncError UploadEntitiesError) ())
doJob [NESet Hash32 -> STM UploadDispatcherJob
forkWorkerMode NESet Hash32
hashes, STM UploadDispatcherJob
checkForFailureMode]
UploadDispatcherForkWorker NESet Hash32
hashes -> do
Int
workerId <-
STM Int -> IO Int
forall a. STM a -> IO a
atomically do
Int
workerId <- WorkerCount -> STM Int
forall a. TVar a -> STM a
readTVar WorkerCount
nextWorkerIdVar
WorkerCount -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar WorkerCount
nextWorkerIdVar (Int -> STM ()) -> Int -> STM ()
forall a b. (a -> b) -> a -> b
$! Int
workerId Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
TVar (Set Int) -> (Set Int -> Set Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Set Int)
workersVar (Int -> Set Int -> Set Int
forall a. Ord a => a -> Set a -> Set a
Set.insert Int
workerId)
pure Int
workerId
Thread ()
_ <-
forall a. Scope -> IO a -> IO (Thread a)
Ki.fork @() Scope
scope do
AuthenticatedHttpClient
-> (forall x. Transaction x -> IO x)
-> TVar (Set Hash32)
-> TVar (Set Hash32)
-> TVar (Set Int)
-> TMVar (SyncError UploadEntitiesError)
-> Int
-> NESet Hash32
-> IO ()
worker AuthenticatedHttpClient
httpClient Transaction a -> IO a
forall x. Transaction x -> IO x
runTransaction TVar (Set Hash32)
hashesVar TVar (Set Hash32)
dedupeVar TVar (Set Int)
workersVar TMVar (SyncError UploadEntitiesError)
workerFailedVar Int
workerId NESet Hash32
hashes
IO (Either (SyncError UploadEntitiesError) ())
loop
UploadDispatcherJob
UploadDispatcherDone -> Either (SyncError UploadEntitiesError) ()
-> IO (Either (SyncError UploadEntitiesError) ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> Either (SyncError UploadEntitiesError) ()
forall a b. b -> Either a b
Right ())
checkForFailureMode :: STM UploadDispatcherJob
checkForFailureMode :: STM UploadDispatcherJob
checkForFailureMode = do
SyncError UploadEntitiesError
err <- TMVar (SyncError UploadEntitiesError)
-> STM (SyncError UploadEntitiesError)
forall a. TMVar a -> STM a
readTMVar TMVar (SyncError UploadEntitiesError)
workerFailedVar
pure (SyncError UploadEntitiesError -> UploadDispatcherJob
UploadDispatcherReturnFailure SyncError UploadEntitiesError
err)
dispatchWorkMode :: STM UploadDispatcherJob
dispatchWorkMode :: STM UploadDispatcherJob
dispatchWorkMode = do
Set Hash32
hashes <- TVar (Set Hash32) -> STM (Set Hash32)
forall a. TVar a -> STM a
readTVar TVar (Set Hash32)
hashesVar
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Set Hash32 -> Bool
forall a. Set a -> Bool
Set.null Set Hash32
hashes) STM ()
forall a. STM a
retry
let (Set Hash32
hashes1, Set Hash32
hashes2) = Int -> Set Hash32 -> (Set Hash32, Set Hash32)
forall a. Int -> Set a -> (Set a, Set a)
Set.splitAt Int
syncChunkSize Set Hash32
hashes
TVar (Set Hash32) -> (Set Hash32 -> Set Hash32) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Set Hash32)
dedupeVar (Set Hash32 -> Set Hash32 -> Set Hash32
forall a. Ord a => Set a -> Set a -> Set a
Set.union Set Hash32
hashes1)
TVar (Set Hash32) -> Set Hash32 -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Set Hash32)
hashesVar Set Hash32
hashes2
pure (NESet Hash32 -> UploadDispatcherJob
UploadDispatcherForkWorkerWhenAvailable (Set Hash32 -> NESet Hash32
forall a. Set a -> NESet a
NESet.unsafeFromSet Set Hash32
hashes1))
forkWorkerMode :: NESet Hash32 -> STM UploadDispatcherJob
forkWorkerMode :: NESet Hash32 -> STM UploadDispatcherJob
forkWorkerMode NESet Hash32
hashes = do
Set Int
workers <- TVar (Set Int) -> STM (Set Int)
forall a. TVar a -> STM a
readTVar TVar (Set Int)
workersVar
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Set Int -> Int
forall a. Set a -> Int
Set.size Set Int
workers Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
maxSimultaneousPushWorkers) STM ()
forall a. STM a
retry
pure (NESet Hash32 -> UploadDispatcherJob
UploadDispatcherForkWorker NESet Hash32
hashes)
checkIfDoneMode :: STM UploadDispatcherJob
checkIfDoneMode :: STM UploadDispatcherJob
checkIfDoneMode = do
Set Int
workers <- TVar (Set Int) -> STM (Set Int)
forall a. TVar a -> STM a
readTVar TVar (Set Int)
workersVar
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not (Set Int -> Bool
forall a. Set a -> Bool
Set.null Set Int
workers)) STM ()
forall a. STM a
retry
pure UploadDispatcherJob
UploadDispatcherDone
worker ::
AuthenticatedHttpClient ->
(forall a. Sqlite.Transaction a -> IO a) ->
TVar (Set Hash32) ->
TVar (Set Hash32) ->
TVar (Set Int) ->
TMVar (SyncError Share.UploadEntitiesError) ->
Int ->
NESet Hash32 ->
IO ()
worker :: AuthenticatedHttpClient
-> (forall x. Transaction x -> IO x)
-> TVar (Set Hash32)
-> TVar (Set Hash32)
-> TVar (Set Int)
-> TMVar (SyncError UploadEntitiesError)
-> Int
-> NESet Hash32
-> IO ()
worker AuthenticatedHttpClient
httpClient forall x. Transaction x -> IO x
runTransaction TVar (Set Hash32)
hashesVar TVar (Set Hash32)
dedupeVar TVar (Set Int)
workersVar TMVar (SyncError UploadEntitiesError)
workerFailedVar Int
workerId NESet Hash32
hashes = do
NEMap Hash32 (Entity Text Hash32 Hash32)
entities <-
(NonEmpty (Hash32, Entity Text Hash32 Hash32)
-> NEMap Hash32 (Entity Text Hash32 Hash32))
-> IO (NonEmpty (Hash32, Entity Text Hash32 Hash32))
-> IO (NEMap Hash32 (Entity Text Hash32 Hash32))
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap NonEmpty (Hash32, Entity Text Hash32 Hash32)
-> NEMap Hash32 (Entity Text Hash32 Hash32)
forall k a. Eq k => NonEmpty (k, a) -> NEMap k a
NEMap.fromAscList do
Transaction (NonEmpty (Hash32, Entity Text Hash32 Hash32))
-> IO (NonEmpty (Hash32, Entity Text Hash32 Hash32))
forall x. Transaction x -> IO x
runTransaction do
NonEmpty Hash32
-> (Hash32 -> Transaction (Hash32, Entity Text Hash32 Hash32))
-> Transaction (NonEmpty (Hash32, Entity Text Hash32 Hash32))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for (NESet Hash32 -> NonEmpty Hash32
forall a. NESet a -> NonEmpty a
NESet.toAscList NESet Hash32
hashes) \Hash32
hash -> do
Entity Text Hash32 Hash32
entity <- Hash32 -> Transaction (Entity Text Hash32 Hash32)
expectEntity Hash32
hash
pure (Hash32
hash, Entity Text Hash32 Hash32
entity)
Either (SyncError UploadEntitiesError) (Set Hash32)
result <-
AuthenticatedHttpClient
-> BaseUrl
-> UploadEntitiesRequest
-> IO (Either CodeserverTransportError UploadEntitiesResponse)
httpUploadEntities AuthenticatedHttpClient
httpClient BaseUrl
unisonShareUrl Share.UploadEntitiesRequest {NEMap Hash32 (Entity Text Hash32 Hash32)
entities :: NEMap Hash32 (Entity Text Hash32 Hash32)
$sel:entities:UploadEntitiesRequest :: NEMap Hash32 (Entity Text Hash32 Hash32)
entities, RepoInfo
repoInfo :: RepoInfo
$sel:repoInfo:UploadEntitiesRequest :: RepoInfo
repoInfo} IO (Either CodeserverTransportError UploadEntitiesResponse)
-> (Either CodeserverTransportError UploadEntitiesResponse
-> Either (SyncError UploadEntitiesError) (Set Hash32))
-> IO (Either (SyncError UploadEntitiesError) (Set Hash32))
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
Left CodeserverTransportError
err -> SyncError UploadEntitiesError
-> Either (SyncError UploadEntitiesError) (Set Hash32)
forall a b. a -> Either a b
Left (CodeserverTransportError -> SyncError UploadEntitiesError
forall e. CodeserverTransportError -> SyncError e
TransportError CodeserverTransportError
err)
Right UploadEntitiesResponse
response ->
case UploadEntitiesResponse
response of
UploadEntitiesResponse
Share.UploadEntitiesSuccess -> Set Hash32 -> Either (SyncError UploadEntitiesError) (Set Hash32)
forall a b. b -> Either a b
Right Set Hash32
forall a. Set a
Set.empty
Share.UploadEntitiesFailure UploadEntitiesError
err ->
case UploadEntitiesError
err of
Share.UploadEntitiesError'NeedDependencies (Share.NeedDependencies NESet Hash32
moreHashes) ->
Set Hash32 -> Either (SyncError UploadEntitiesError) (Set Hash32)
forall a b. b -> Either a b
Right (NESet Hash32 -> Set Hash32
forall a. NESet a -> Set a
NESet.toSet NESet Hash32
moreHashes)
UploadEntitiesError
err -> SyncError UploadEntitiesError
-> Either (SyncError UploadEntitiesError) (Set Hash32)
forall a b. a -> Either a b
Left (UploadEntitiesError -> SyncError UploadEntitiesError
forall e. e -> SyncError e
SyncError UploadEntitiesError
err)
case Either (SyncError UploadEntitiesError) (Set Hash32)
result of
Left SyncError UploadEntitiesError
err -> IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (TMVar (SyncError UploadEntitiesError)
-> SyncError UploadEntitiesError -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar (SyncError UploadEntitiesError)
workerFailedVar SyncError UploadEntitiesError
err))
Right Set Hash32
moreHashes -> do
Int -> IO ()
uploadedCallback (NESet Hash32 -> Int
forall a. NESet a -> Int
NESet.size NESet Hash32
hashes)
Maybe Int
maybeYoungestWorkerThatWasAlive <-
STM (Maybe Int) -> IO (Maybe Int)
forall a. STM a -> IO a
atomically do
!Set Int
workers <- Int -> Set Int -> Set Int
forall a. Ord a => a -> Set a -> Set a
Set.delete Int
workerId (Set Int -> Set Int) -> STM (Set Int) -> STM (Set Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar (Set Int) -> STM (Set Int)
forall a. TVar a -> STM a
readTVar TVar (Set Int)
workersVar
TVar (Set Int) -> Set Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Set Int)
workersVar Set Int
workers
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not (Set Hash32 -> Bool
forall a. Set a -> Bool
Set.null Set Hash32
moreHashes)) do
Set Hash32
dedupe <- TVar (Set Hash32) -> STM (Set Hash32)
forall a. TVar a -> STM a
readTVar TVar (Set Hash32)
dedupeVar
Set Hash32
hashes0 <- TVar (Set Hash32) -> STM (Set Hash32)
forall a. TVar a -> STM a
readTVar TVar (Set Hash32)
hashesVar
TVar (Set Hash32) -> Set Hash32 -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Set Hash32)
hashesVar (Set Hash32 -> STM ()) -> Set Hash32 -> STM ()
forall a b. (a -> b) -> a -> b
$! Set Hash32 -> Set Hash32 -> Set Hash32
forall a. Ord a => Set a -> Set a -> Set a
Set.union (Set Hash32 -> Set Hash32 -> Set Hash32
forall a. Ord a => Set a -> Set a -> Set a
Set.difference Set Hash32
moreHashes Set Hash32
dedupe) Set Hash32
hashes0
pure (Set Int -> Maybe Int
forall a. Set a -> Maybe a
Set.lookupMax Set Int
workers)
Maybe Int -> (Int -> IO ()) -> IO ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
whenJust Maybe Int
maybeYoungestWorkerThatWasAlive \Int
youngestWorkerThatWasAlive -> do
STM () -> IO ()
forall a. STM a -> IO a
atomically do
Set Int
workers <- TVar (Set Int) -> STM (Set Int)
forall a. TVar a -> STM a
readTVar TVar (Set Int)
workersVar
Maybe Int -> (Int -> STM ()) -> STM ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
whenJust (Set Int -> Maybe Int
forall a. Set a -> Maybe a
Set.lookupMin Set Int
workers) \Int
oldestWorkerAlive ->
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
oldestWorkerAlive Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
youngestWorkerThatWasAlive) STM ()
forall a. STM a
retry
STM () -> IO ()
forall a. STM a -> IO a
atomically (TVar (Set Hash32) -> (Set Hash32 -> Set Hash32) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Set Hash32)
dedupeVar (Set Hash32 -> Set Hash32 -> Set Hash32
forall a. Ord a => Set a -> Set a -> Set a
`Set.difference` (NESet Hash32 -> Set Hash32
forall a. NESet a -> Set a
NESet.toSet NESet Hash32
hashes)))
elaborateHashes :: NESet Hash32 -> Sqlite.Transaction (Set Share.HashJWT)
elaborateHashes :: NESet Hash32 -> Transaction (Set HashJWT)
elaborateHashes NESet Hash32
hashes =
NonEmpty Hash32 -> Transaction [Text]
Q.elaborateHashes (NESet Hash32 -> NonEmpty Hash32
forall a. NESet a -> NonEmpty a
NESet.toList NESet Hash32
hashes) Transaction [Text]
-> ([Text] -> Set HashJWT) -> Transaction (Set HashJWT)
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> [HashJWT] -> Set HashJWT
forall a. Ord a => [a] -> Set a
Set.fromList ([HashJWT] -> Set HashJWT)
-> ([Text] -> [HashJWT]) -> [Text] -> Set HashJWT
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. Coercible a b => a -> b
forall a b. Coercible a b => a -> b
coerce @[Text] @[Share.HashJWT]
upsertEntitySomewhere ::
Hash32 ->
Share.Entity Text Hash32 Share.HashJWT ->
Sqlite.Transaction Q.EntityLocation
upsertEntitySomewhere :: Hash32 -> Entity Text Hash32 HashJWT -> Transaction EntityLocation
upsertEntitySomewhere Hash32
hash Entity Text Hash32 HashJWT
entity =
Hash32 -> Transaction (Maybe EntityLocation)
Q.entityLocation Hash32
hash Transaction (Maybe EntityLocation)
-> (Maybe EntityLocation -> Transaction EntityLocation)
-> Transaction EntityLocation
forall a b. Transaction a -> (a -> Transaction b) -> Transaction b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just EntityLocation
location -> EntityLocation -> Transaction EntityLocation
forall a. a -> Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure EntityLocation
location
Maybe EntityLocation
Nothing -> do
Map Hash32 HashJWT
missingDependencies1 :: Map Hash32 Share.HashJWT <-
Entity Text Hash32 HashJWT -> Set HashJWT
forall hash text noSyncHash.
Ord hash =>
Entity text noSyncHash hash -> Set hash
Share.entityDependencies Entity Text Hash32 HashJWT
entity
Set HashJWT
-> (Set HashJWT -> Transaction (Map Hash32 HashJWT))
-> Transaction (Map Hash32 HashJWT)
forall a b. a -> (a -> b) -> b
& (HashJWT -> Transaction (Map Hash32 HashJWT))
-> Set HashJWT -> Transaction (Map Hash32 HashJWT)
forall (m :: * -> *) (f :: * -> *) b a.
(Monad m, Foldable f, Monoid b) =>
(a -> m b) -> f a -> m b
foldMapM
( \HashJWT
hashJwt -> do
let hash :: Hash32
hash = HashJWT -> Hash32
Share.hashJWTHash HashJWT
hashJwt
Hash32 -> Transaction Bool
Q.entityExists Hash32
hash Transaction Bool
-> (Bool -> Map Hash32 HashJWT) -> Transaction (Map Hash32 HashJWT)
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
Bool
True -> Map Hash32 HashJWT
forall k a. Map k a
Map.empty
Bool
False -> Hash32 -> HashJWT -> Map Hash32 HashJWT
forall k a. k -> a -> Map k a
Map.singleton Hash32
hash HashJWT
hashJwt
)
case Map Hash32 HashJWT -> Maybe (NEMap Hash32 HashJWT)
forall k a. Map k a -> Maybe (NEMap k a)
NEMap.nonEmptyMap Map Hash32 HashJWT
missingDependencies1 of
Maybe (NEMap Hash32 HashJWT)
Nothing -> do
Either CausalHashId ObjectId
_id <- HashHandle
-> Hash32
-> TempEntity
-> Transaction (Either CausalHashId ObjectId)
Q.saveTempEntityInMain HashHandle
v2HashHandle Hash32
hash ((HashJWT -> Hash32) -> Entity Text Hash32 HashJWT -> TempEntity
forall hash.
(hash -> Hash32) -> Entity Text Hash32 hash -> TempEntity
entityToTempEntity HashJWT -> Hash32
Share.hashJWTHash Entity Text Hash32 HashJWT
entity)
pure EntityLocation
Q.EntityInMainStorage
Just NEMap Hash32 HashJWT
missingDependencies -> do
Hash32 -> TempEntity -> NEMap Hash32 Text -> Transaction ()
Q.insertTempEntity
Hash32
hash
((HashJWT -> Hash32) -> Entity Text Hash32 HashJWT -> TempEntity
forall hash.
(hash -> Hash32) -> Entity Text Hash32 hash -> TempEntity
entityToTempEntity HashJWT -> Hash32
Share.hashJWTHash Entity Text Hash32 HashJWT
entity)
( forall a b. Coercible a b => a -> b
forall a b. Coercible a b => a -> b
coerce
@(NEMap Hash32 Share.HashJWT)
@(NEMap Hash32 Text)
NEMap Hash32 HashJWT
missingDependencies
)
pure EntityLocation
Q.EntityInTempStorage
httpGetCausalHashByPath ::
Auth.AuthenticatedHttpClient ->
BaseUrl ->
Share.GetCausalHashByPathRequest ->
IO (Either CodeserverTransportError Share.GetCausalHashByPathResponse)
httpDownloadEntities ::
Auth.AuthenticatedHttpClient ->
BaseUrl ->
Share.DownloadEntitiesRequest ->
IO (Either CodeserverTransportError Share.DownloadEntitiesResponse)
httpUploadEntities ::
Auth.AuthenticatedHttpClient ->
BaseUrl ->
Share.UploadEntitiesRequest ->
IO (Either CodeserverTransportError Share.UploadEntitiesResponse)
( AuthenticatedHttpClient
-> BaseUrl
-> GetCausalHashByPathRequest
-> IO (Either CodeserverTransportError GetCausalHashByPathResponse)
httpGetCausalHashByPath,
AuthenticatedHttpClient
-> BaseUrl
-> DownloadEntitiesRequest
-> IO (Either CodeserverTransportError DownloadEntitiesResponse)
httpDownloadEntities,
AuthenticatedHttpClient
-> BaseUrl
-> UploadEntitiesRequest
-> IO (Either CodeserverTransportError UploadEntitiesResponse)
httpUploadEntities
) =
let ( GetCausalHashByPathRequest
-> ReaderT
ClientEnv
(ExceptT CodeserverTransportError IO)
GetCausalHashByPathResponse
httpGetCausalHashByPath
Servant.:<|> DownloadEntitiesRequest
-> ReaderT
ClientEnv
(ExceptT CodeserverTransportError IO)
DownloadEntitiesResponse
httpDownloadEntities
Servant.:<|> UploadEntitiesRequest
-> ReaderT
ClientEnv
(ExceptT CodeserverTransportError IO)
UploadEntitiesResponse
httpUploadEntities
) =
let pp :: Proxy ("ucm" Servant.:> "v1" Servant.:> "sync" Servant.:> Share.API)
pp :: Proxy ("ucm" :> ("v1" :> ("sync" :> API)))
pp = Proxy ("ucm" :> ("v1" :> ("sync" :> API)))
forall {k} (t :: k). Proxy t
Proxy
in Proxy ("ucm" :> ("v1" :> ("sync" :> API)))
-> (forall a.
ClientM a
-> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) a)
-> Client ClientM ("ucm" :> ("v1" :> ("sync" :> API)))
-> Client
(ReaderT ClientEnv (ExceptT CodeserverTransportError IO))
("ucm" :> ("v1" :> ("sync" :> API)))
forall api (m :: * -> *) (n :: * -> *).
HasClient ClientM api =>
Proxy api -> (forall a. m a -> n a) -> Client m api -> Client n api
Servant.hoistClient Proxy ("ucm" :> ("v1" :> ("sync" :> API)))
pp ClientM a
-> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) a
forall a.
ClientM a
-> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) a
hoist (Proxy ("ucm" :> ("v1" :> ("sync" :> API)))
-> Client ClientM ("ucm" :> ("v1" :> ("sync" :> API)))
forall api.
HasClient ClientM api =>
Proxy api -> Client ClientM api
Servant.client Proxy ("ucm" :> ("v1" :> ("sync" :> API)))
pp)
in ( (GetCausalHashByPathRequest
-> ReaderT
ClientEnv
(ExceptT CodeserverTransportError IO)
GetCausalHashByPathResponse)
-> AuthenticatedHttpClient
-> BaseUrl
-> GetCausalHashByPathRequest
-> IO (Either CodeserverTransportError GetCausalHashByPathResponse)
forall req resp.
(req
-> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) resp)
-> AuthenticatedHttpClient
-> BaseUrl
-> req
-> IO (Either CodeserverTransportError resp)
go GetCausalHashByPathRequest
-> ReaderT
ClientEnv
(ExceptT CodeserverTransportError IO)
GetCausalHashByPathResponse
httpGetCausalHashByPath,
(DownloadEntitiesRequest
-> ReaderT
ClientEnv
(ExceptT CodeserverTransportError IO)
DownloadEntitiesResponse)
-> AuthenticatedHttpClient
-> BaseUrl
-> DownloadEntitiesRequest
-> IO (Either CodeserverTransportError DownloadEntitiesResponse)
forall req resp.
(req
-> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) resp)
-> AuthenticatedHttpClient
-> BaseUrl
-> req
-> IO (Either CodeserverTransportError resp)
go DownloadEntitiesRequest
-> ReaderT
ClientEnv
(ExceptT CodeserverTransportError IO)
DownloadEntitiesResponse
httpDownloadEntities,
(UploadEntitiesRequest
-> ReaderT
ClientEnv
(ExceptT CodeserverTransportError IO)
UploadEntitiesResponse)
-> AuthenticatedHttpClient
-> BaseUrl
-> UploadEntitiesRequest
-> IO (Either CodeserverTransportError UploadEntitiesResponse)
forall req resp.
(req
-> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) resp)
-> AuthenticatedHttpClient
-> BaseUrl
-> req
-> IO (Either CodeserverTransportError resp)
go UploadEntitiesRequest
-> ReaderT
ClientEnv
(ExceptT CodeserverTransportError IO)
UploadEntitiesResponse
httpUploadEntities
)
where
hoist :: Servant.ClientM a -> ReaderT Servant.ClientEnv (ExceptT CodeserverTransportError IO) a
hoist :: forall a.
ClientM a
-> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) a
hoist ClientM a
m = do
ClientEnv
clientEnv <- ReaderT ClientEnv (ExceptT CodeserverTransportError IO) ClientEnv
forall (m :: * -> *) r. Monad m => ReaderT r m r
Reader.ask
IO (Either ClientError a)
-> ReaderT
ClientEnv
(ExceptT CodeserverTransportError IO)
(Either ClientError a)
forall a.
IO a -> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (ClientM a -> ClientEnv -> IO (Either ClientError a)
forall a. ClientM a -> ClientEnv -> IO (Either ClientError a)
Servant.runClientM ClientM a
m ClientEnv
clientEnv) ReaderT
ClientEnv
(ExceptT CodeserverTransportError IO)
(Either ClientError a)
-> (Either ClientError a
-> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) a)
-> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) a
forall a b.
ReaderT ClientEnv (ExceptT CodeserverTransportError IO) a
-> (a -> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) b)
-> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Right a
a -> a -> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) a
forall a.
a -> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
a
Left ClientError
err -> do
DebugFlag
-> String
-> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) ()
forall (m :: * -> *). Monad m => DebugFlag -> String -> m ()
Debug.debugLogM DebugFlag
Debug.Sync (ClientError -> String
forall a. Show a => a -> String
show ClientError
err)
CodeserverTransportError
-> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) a
forall a.
CodeserverTransportError
-> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError case ClientError
err of
Servant.FailureResponse RequestF () (BaseUrl, ByteString)
_req Response
resp ->
case Status -> Int
HTTP.statusCode (Status -> Int) -> Status -> Int
forall a b. (a -> b) -> a -> b
$ Response -> Status
forall a. ResponseF a -> Status
Servant.responseStatusCode Response
resp of
Int
401 -> BaseUrl -> CodeserverTransportError
Unauthenticated (ClientEnv -> BaseUrl
Servant.baseUrl ClientEnv
clientEnv)
Int
403 -> Text -> CodeserverTransportError
PermissionDenied (Text -> Text
Text.Lazy.toStrict (Text -> Text) -> (ByteString -> Text) -> ByteString -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text
Text.Lazy.decodeUtf8 (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ Response -> ByteString
forall a. ResponseF a -> a
Servant.responseBody Response
resp)
Int
408 -> CodeserverTransportError
Timeout
Int
429 -> CodeserverTransportError
RateLimitExceeded
Int
504 -> CodeserverTransportError
Timeout
Int
_ -> Response -> CodeserverTransportError
UnexpectedResponse Response
resp
Servant.DecodeFailure Text
msg Response
resp -> Text -> Response -> CodeserverTransportError
DecodeFailure Text
msg Response
resp
Servant.UnsupportedContentType MediaType
_ct Response
resp -> Response -> CodeserverTransportError
UnexpectedResponse Response
resp
Servant.InvalidContentTypeHeader Response
resp -> Response -> CodeserverTransportError
UnexpectedResponse Response
resp
Servant.ConnectionError SomeException
_ -> BaseUrl -> CodeserverTransportError
UnreachableCodeserver (ClientEnv -> BaseUrl
Servant.baseUrl ClientEnv
clientEnv)
go ::
(req -> ReaderT Servant.ClientEnv (ExceptT CodeserverTransportError IO) resp) ->
Auth.AuthenticatedHttpClient ->
BaseUrl ->
req ->
IO (Either CodeserverTransportError resp)
go :: forall req resp.
(req
-> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) resp)
-> AuthenticatedHttpClient
-> BaseUrl
-> req
-> IO (Either CodeserverTransportError resp)
go req -> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) resp
f (Auth.AuthenticatedHttpClient Manager
httpClient) BaseUrl
unisonShareUrl req
req =
(Manager -> BaseUrl -> ClientEnv
Servant.mkClientEnv Manager
httpClient BaseUrl
unisonShareUrl)
{ Servant.makeClientRequest = \BaseUrl
url Request
request ->
(BaseUrl -> Request -> IO Request
Servant.defaultMakeClientRequest BaseUrl
url Request
request)
IO Request -> (Request -> Request) -> IO Request
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \Request
r ->
Request
r
{ Http.Client.responseTimeout = Http.Client.responseTimeoutNone
}
}
ClientEnv
-> (ClientEnv -> ExceptT CodeserverTransportError IO resp)
-> ExceptT CodeserverTransportError IO resp
forall a b. a -> (a -> b) -> b
& ReaderT ClientEnv (ExceptT CodeserverTransportError IO) resp
-> ClientEnv -> ExceptT CodeserverTransportError IO resp
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (req -> ReaderT ClientEnv (ExceptT CodeserverTransportError IO) resp
f req
req)
ExceptT CodeserverTransportError IO resp
-> (ExceptT CodeserverTransportError IO resp
-> IO (Either CodeserverTransportError resp))
-> IO (Either CodeserverTransportError resp)
forall a b. a -> (a -> b) -> b
& ExceptT CodeserverTransportError IO resp
-> IO (Either CodeserverTransportError resp)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT