{-# LANGUAGE DataKinds #-}
{-# LANGUAGE TypeOperators #-}
module Unison.Share.SyncV2
( syncFromFile,
syncToFile,
syncFromCodebase,
syncFromCodeserver,
)
where
import Codec.Serialise qualified as CBOR
import Conduit (ConduitT)
import Conduit qualified as C
import Control.Lens
import Control.Monad.Except
import Control.Monad.Reader (ask)
import Control.Monad.ST (ST, stToIO)
import Control.Monad.State
import Data.Attoparsec.ByteString qualified as A
import Data.Attoparsec.ByteString.Char8 qualified as A8
import Data.ByteString qualified as BS
import Data.ByteString.Lazy qualified as BL
import Data.Conduit.Attoparsec qualified as C
import Data.Conduit.Combinators qualified as C
import Data.Conduit.List qualified as CL
import Data.Conduit.Zlib qualified as C
import Data.Foldable qualified as Foldable
import Data.Graph qualified as Graph
import Data.Map qualified as Map
import Data.Proxy
import Data.Set qualified as Set
import Data.Text.IO qualified as Text
import Data.Text.Lazy qualified as Text.Lazy
import Data.Text.Lazy.Encoding qualified as Text.Lazy
import Data.Vector (Vector)
import Data.Vector qualified as Vector
import Network.HTTP.Client qualified as Http.Client
import Network.HTTP.Types qualified as HTTP
import Servant.API qualified as Servant
import Servant.Client.Streaming qualified as Servant
import Servant.Conduit ()
import Servant.Types.SourceT qualified as Servant
import System.Console.Regions qualified as Console.Regions
import U.Codebase.HashTags (CausalHash)
import U.Codebase.Sqlite.Queries qualified as Q
import U.Codebase.Sqlite.TempEntity (TempEntity)
import U.Codebase.Sqlite.V2.HashHandle (v2HashHandle)
import Unison.Auth.HTTPClient qualified as Auth
import Unison.Cli.Monad (Cli)
import Unison.Cli.Monad qualified as Cli
import Unison.Codebase qualified as Codebase
import Unison.Hash32 (Hash32)
import Unison.Prelude
import Unison.Share.API.Hash qualified as Share
import Unison.Share.ExpectedHashMismatches (expectedCausalHashMismatches, expectedComponentHashMismatches)
import Unison.Share.Sync.Types
import Unison.Sqlite qualified as Sqlite
import Unison.Sync.Common (causalHashToHash32, hash32ToCausalHash, tempEntityToEntity)
import Unison.Sync.Common qualified as Sync
import Unison.Sync.EntityValidation qualified as EV
import Unison.Sync.Types qualified as Share
import Unison.Sync.Types qualified as Sync
import Unison.SyncV2.API (Routes (downloadEntitiesStream))
import Unison.SyncV2.API qualified as SyncV2
import Unison.SyncV2.Types (CBORBytes, CBORStream, DependencyType (..))
import Unison.SyncV2.Types qualified as SyncV2
import Unison.Util.Servant.CBOR qualified as CBOR
import Unison.Util.Timing qualified as Timing
import UnliftIO qualified as IO
type Stream i o = ConduitT i o StreamM ()
type SyncErr = SyncError SyncV2.PullError
type StreamM = (ExceptT SyncErr (C.ResourceT IO))
batchSize :: Int
batchSize :: Int
batchSize = Int
5000
syncToFile ::
Codebase.Codebase IO v a ->
CausalHash ->
Maybe SyncV2.BranchRef ->
FilePath ->
IO (Either SyncErr ())
syncToFile :: forall v a.
Codebase IO v a
-> CausalHash
-> Maybe BranchRef
-> FilePath
-> IO (Either SyncErr ())
syncToFile Codebase IO v a
codebase CausalHash
rootHash Maybe BranchRef
mayBranchRef FilePath
destFilePath = do
IO (Either SyncErr ()) -> IO (Either SyncErr ())
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either SyncErr ()) -> IO (Either SyncErr ()))
-> IO (Either SyncErr ()) -> IO (Either SyncErr ())
forall a b. (a -> b) -> a -> b
$ Codebase IO v a -> forall x. (Connection -> IO x) -> IO x
forall (m :: * -> *) v a.
Codebase m v a -> forall x. (Connection -> m x) -> m x
Codebase.withConnection Codebase IO v a
codebase \Connection
conn -> do
ResourceT IO (Either SyncErr ()) -> IO (Either SyncErr ())
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
C.runResourceT (ResourceT IO (Either SyncErr ()) -> IO (Either SyncErr ()))
-> ResourceT IO (Either SyncErr ()) -> IO (Either SyncErr ())
forall a b. (a -> b) -> a -> b
$
Connection
-> CausalHash
-> Maybe BranchRef
-> (Int
-> Stream () DownloadEntitiesChunk
-> ResourceT IO (Either SyncErr ()))
-> ResourceT IO (Either SyncErr ())
forall (m :: * -> *) r.
MonadIO m =>
Connection
-> CausalHash
-> Maybe BranchRef
-> (Int -> Stream () DownloadEntitiesChunk -> m r)
-> m r
withCodebaseEntityStream Connection
conn CausalHash
rootHash Maybe BranchRef
mayBranchRef \Int
mayTotal Stream () DownloadEntitiesChunk
stream -> do
Maybe Int
-> (ConduitT
DownloadEntitiesChunk
DownloadEntitiesChunk
(ExceptT SyncErr (ResourceT IO))
()
-> ResourceT IO (Either SyncErr ()))
-> ResourceT IO (Either SyncErr ())
forall (m :: * -> *) (n :: * -> *) i a.
(MonadIO m, MonadUnliftIO n) =>
Maybe Int -> (ConduitT i i m () -> n a) -> n a
withStreamProgressCallback (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
mayTotal) \ConduitT
DownloadEntitiesChunk
DownloadEntitiesChunk
(ExceptT SyncErr (ResourceT IO))
()
countC -> ExceptT SyncErr (ResourceT IO) ()
-> ResourceT IO (Either SyncErr ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT do
ConduitT () Void (ExceptT SyncErr (ResourceT IO)) ()
-> ExceptT SyncErr (ResourceT IO) ()
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
C.runConduit (ConduitT () Void (ExceptT SyncErr (ResourceT IO)) ()
-> ExceptT SyncErr (ResourceT IO) ())
-> ConduitT () Void (ExceptT SyncErr (ResourceT IO)) ()
-> ExceptT SyncErr (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$
Stream () DownloadEntitiesChunk
stream
Stream () DownloadEntitiesChunk
-> ConduitT
DownloadEntitiesChunk Void (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT () Void (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| ConduitT
DownloadEntitiesChunk
DownloadEntitiesChunk
(ExceptT SyncErr (ResourceT IO))
()
countC
ConduitT
DownloadEntitiesChunk
DownloadEntitiesChunk
(ExceptT SyncErr (ResourceT IO))
()
-> ConduitT
DownloadEntitiesChunk Void (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT
DownloadEntitiesChunk Void (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| (DownloadEntitiesChunk -> ByteString)
-> ConduitT
DownloadEntitiesChunk
ByteString
(ExceptT SyncErr (ResourceT IO))
()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
C.map (ByteString -> ByteString
BL.toStrict (ByteString -> ByteString)
-> (DownloadEntitiesChunk -> ByteString)
-> DownloadEntitiesChunk
-> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DownloadEntitiesChunk -> ByteString
forall a. Serialise a => a -> ByteString
CBOR.serialise)
ConduitT
DownloadEntitiesChunk
ByteString
(ExceptT SyncErr (ResourceT IO))
()
-> ConduitT ByteString Void (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT
DownloadEntitiesChunk Void (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| (forall a. IO a -> ExceptT SyncErr (ResourceT IO) a)
-> ConduitT ByteString ByteString IO ()
-> ConduitT
ByteString ByteString (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) (n :: * -> *) i o r.
Monad m =>
(forall a. m a -> n a) -> ConduitT i o m r -> ConduitT i o n r
C.transPipe IO a -> ExceptT SyncErr (ResourceT IO) a
forall a. IO a -> ExceptT SyncErr (ResourceT IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO ConduitT ByteString ByteString IO ()
forall (m :: * -> *).
(MonadThrow m, PrimMonad m) =>
ConduitT ByteString ByteString m ()
C.gzip
ConduitT ByteString ByteString (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT ByteString Void (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT ByteString Void (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| FilePath
-> ConduitT ByteString Void (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) o.
MonadResource m =>
FilePath -> ConduitT ByteString o m ()
C.sinkFile FilePath
destFilePath
syncFromFile ::
Bool ->
FilePath ->
Cli (Either (SyncError SyncV2.PullError) CausalHash)
syncFromFile :: Bool -> FilePath -> Cli (Either SyncErr CausalHash)
syncFromFile Bool
shouldValidate FilePath
syncFilePath = do
Cli.Env {Codebase IO Symbol Ann
codebase :: Codebase IO Symbol Ann
$sel:codebase:Env :: Env -> Codebase IO Symbol Ann
codebase} <- Cli Env
forall r (m :: * -> *). MonadReader r m => m r
ask
Transaction () -> Cli ()
forall a. Transaction a -> Cli a
Cli.runTransaction Transaction ()
Q.clearTempEntityTables
ExceptT SyncErr Cli CausalHash -> Cli (Either SyncErr CausalHash)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT do
(IO (Either SyncErr CausalHash) -> Cli (Either SyncErr CausalHash))
-> ExceptT SyncErr IO CausalHash -> ExceptT SyncErr Cli CausalHash
forall (m :: * -> *) e a (n :: * -> *) e' b.
(m (Either e a) -> n (Either e' b))
-> ExceptT e m a -> ExceptT e' n b
mapExceptT IO (Either SyncErr CausalHash) -> Cli (Either SyncErr CausalHash)
forall a. IO a -> Cli a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (ExceptT SyncErr IO CausalHash -> ExceptT SyncErr Cli CausalHash)
-> ExceptT SyncErr IO CausalHash -> ExceptT SyncErr Cli CausalHash
forall a b. (a -> b) -> a -> b
$ FilePath
-> ExceptT SyncErr IO CausalHash -> ExceptT SyncErr IO CausalHash
forall (m :: * -> *) a. MonadIO m => FilePath -> m a -> m a
Timing.time FilePath
"File Sync" (ExceptT SyncErr IO CausalHash -> ExceptT SyncErr IO CausalHash)
-> ExceptT SyncErr IO CausalHash -> ExceptT SyncErr IO CausalHash
forall a b. (a -> b) -> a -> b
$ do
StreamInitInfo
header <- (ResourceT IO (Either SyncErr StreamInitInfo)
-> IO (Either SyncErr StreamInitInfo))
-> ExceptT SyncErr (ResourceT IO) StreamInitInfo
-> ExceptT SyncErr IO StreamInitInfo
forall (m :: * -> *) e a (n :: * -> *) e' b.
(m (Either e a) -> n (Either e' b))
-> ExceptT e m a -> ExceptT e' n b
mapExceptT ResourceT IO (Either SyncErr StreamInitInfo)
-> IO (Either SyncErr StreamInitInfo)
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
C.runResourceT (ExceptT SyncErr (ResourceT IO) StreamInitInfo
-> ExceptT SyncErr IO StreamInitInfo)
-> ExceptT SyncErr (ResourceT IO) StreamInitInfo
-> ExceptT SyncErr IO StreamInitInfo
forall a b. (a -> b) -> a -> b
$ do
let stream :: Stream () DownloadEntitiesChunk
stream = FilePath
-> ConduitT () ByteString (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) i.
MonadResource m =>
FilePath -> ConduitT i ByteString m ()
C.sourceFile FilePath
syncFilePath ConduitT () ByteString (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT
ByteString
DownloadEntitiesChunk
(ExceptT SyncErr (ResourceT IO))
()
-> Stream () DownloadEntitiesChunk
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| ConduitT ByteString ByteString (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *).
(PrimMonad m, MonadThrow m) =>
ConduitT ByteString ByteString m ()
C.ungzip ConduitT ByteString ByteString (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT
ByteString
DownloadEntitiesChunk
(ExceptT SyncErr (ResourceT IO))
()
-> ConduitT
ByteString
DownloadEntitiesChunk
(ExceptT SyncErr (ResourceT IO))
()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| ConduitT
ByteString
DownloadEntitiesChunk
(ExceptT SyncErr (ResourceT IO))
()
forall a. Serialise a => Stream ByteString a
decodeUnframedEntities
(StreamInitInfo
header, Stream () EntityChunk
rest) <- Stream () DownloadEntitiesChunk
-> StreamM (StreamInitInfo, Stream () EntityChunk)
initializeStream Stream () DownloadEntitiesChunk
stream
Bool
-> Codebase IO Symbol Ann
-> StreamInitInfo
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
forall v a.
Bool
-> Codebase IO v a
-> StreamInitInfo
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
streamIntoCodebase Bool
shouldValidate Codebase IO Symbol Ann
codebase StreamInitInfo
header Stream () EntityChunk
rest
pure StreamInitInfo
header
Codebase IO Symbol Ann -> Hash32 -> ExceptT SyncErr IO ()
forall v a. Codebase IO v a -> Hash32 -> ExceptT SyncErr IO ()
afterSyncChecks Codebase IO Symbol Ann
codebase (StreamInitInfo -> Hash32
SyncV2.rootCausalHash StreamInitInfo
header)
CausalHash -> ExceptT SyncErr IO CausalHash
forall a. a -> ExceptT SyncErr IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (CausalHash -> ExceptT SyncErr IO CausalHash)
-> (Hash32 -> CausalHash)
-> Hash32
-> ExceptT SyncErr IO CausalHash
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Hash32 -> CausalHash
hash32ToCausalHash (Hash32 -> ExceptT SyncErr IO CausalHash)
-> Hash32 -> ExceptT SyncErr IO CausalHash
forall a b. (a -> b) -> a -> b
$ StreamInitInfo -> Hash32
SyncV2.rootCausalHash StreamInitInfo
header
syncFromCodebase ::
Bool ->
Sqlite.Connection ->
(Codebase.Codebase IO v a) ->
CausalHash ->
IO (Either (SyncError SyncV2.PullError) ())
syncFromCodebase :: forall v a.
Bool
-> Connection
-> Codebase IO v a
-> CausalHash
-> IO (Either SyncErr ())
syncFromCodebase Bool
shouldValidate Connection
srcConn Codebase IO v a
destCodebase CausalHash
causalHash = do
Connection -> Transaction () -> IO ()
forall (m :: * -> *) a.
(MonadIO m, HasCallStack) =>
Connection -> Transaction a -> m a
Sqlite.runTransaction Connection
srcConn Transaction ()
Q.clearTempEntityTables
IO (Either SyncErr ()) -> IO (Either SyncErr ())
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either SyncErr ()) -> IO (Either SyncErr ()))
-> (ExceptT SyncErr (ResourceT IO) () -> IO (Either SyncErr ()))
-> ExceptT SyncErr (ResourceT IO) ()
-> IO (Either SyncErr ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ResourceT IO (Either SyncErr ()) -> IO (Either SyncErr ())
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
C.runResourceT (ResourceT IO (Either SyncErr ()) -> IO (Either SyncErr ()))
-> (ExceptT SyncErr (ResourceT IO) ()
-> ResourceT IO (Either SyncErr ()))
-> ExceptT SyncErr (ResourceT IO) ()
-> IO (Either SyncErr ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExceptT SyncErr (ResourceT IO) ()
-> ResourceT IO (Either SyncErr ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT SyncErr (ResourceT IO) () -> IO (Either SyncErr ()))
-> ExceptT SyncErr (ResourceT IO) () -> IO (Either SyncErr ())
forall a b. (a -> b) -> a -> b
$ Connection
-> CausalHash
-> Maybe BranchRef
-> (Int
-> Stream () DownloadEntitiesChunk
-> ExceptT SyncErr (ResourceT IO) ())
-> ExceptT SyncErr (ResourceT IO) ()
forall (m :: * -> *) r.
MonadIO m =>
Connection
-> CausalHash
-> Maybe BranchRef
-> (Int -> Stream () DownloadEntitiesChunk -> m r)
-> m r
withCodebaseEntityStream Connection
srcConn CausalHash
causalHash Maybe BranchRef
forall a. Maybe a
Nothing \Int
_total Stream () DownloadEntitiesChunk
entityStream -> do
(StreamInitInfo
header, Stream () EntityChunk
rest) <- Stream () DownloadEntitiesChunk
-> StreamM (StreamInitInfo, Stream () EntityChunk)
initializeStream Stream () DownloadEntitiesChunk
entityStream
Bool
-> Codebase IO v a
-> StreamInitInfo
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
forall v a.
Bool
-> Codebase IO v a
-> StreamInitInfo
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
streamIntoCodebase Bool
shouldValidate Codebase IO v a
destCodebase StreamInitInfo
header Stream () EntityChunk
rest
(IO (Either SyncErr ()) -> ResourceT IO (Either SyncErr ()))
-> ExceptT SyncErr IO () -> ExceptT SyncErr (ResourceT IO) ()
forall (m :: * -> *) e a (n :: * -> *) e' b.
(m (Either e a) -> n (Either e' b))
-> ExceptT e m a -> ExceptT e' n b
mapExceptT IO (Either SyncErr ()) -> ResourceT IO (Either SyncErr ())
forall a. IO a -> ResourceT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Codebase IO v a -> Hash32 -> ExceptT SyncErr IO ()
forall v a. Codebase IO v a -> Hash32 -> ExceptT SyncErr IO ()
afterSyncChecks Codebase IO v a
destCodebase (CausalHash -> Hash32
causalHashToHash32 CausalHash
causalHash))
syncFromCodeserver ::
Bool ->
Servant.BaseUrl ->
SyncV2.BranchRef ->
Share.HashJWT ->
(Int -> IO ()) ->
Cli (Either (SyncError SyncV2.PullError) ())
syncFromCodeserver :: Bool
-> BaseUrl
-> BranchRef
-> HashJWT
-> (Int -> IO ())
-> Cli (Either SyncErr ())
syncFromCodeserver Bool
shouldValidate BaseUrl
unisonShareUrl BranchRef
branchRef HashJWT
hashJwt Int -> IO ()
_downloadedCallback = do
Cli.Env {AuthenticatedHttpClient
authHTTPClient :: AuthenticatedHttpClient
$sel:authHTTPClient:Env :: Env -> AuthenticatedHttpClient
authHTTPClient, Codebase IO Symbol Ann
$sel:codebase:Env :: Env -> Codebase IO Symbol Ann
codebase :: Codebase IO Symbol Ann
codebase} <- Cli Env
forall r (m :: * -> *). MonadReader r m => m r
ask
Transaction () -> Cli ()
forall a. Transaction a -> Cli a
Cli.runTransaction Transaction ()
Q.clearTempEntityTables
ExceptT SyncErr Cli () -> Cli (Either SyncErr ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT do
Set Hash32
knownHashes <- Cli (Either SyncErr (Set Hash32))
-> ExceptT SyncErr Cli (Set Hash32)
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (Cli (Either SyncErr (Set Hash32))
-> ExceptT SyncErr Cli (Set Hash32))
-> Cli (Either SyncErr (Set Hash32))
-> ExceptT SyncErr Cli (Set Hash32)
forall a b. (a -> b) -> a -> b
$ BaseUrl
-> BranchRef -> HashJWT -> Cli (Either SyncErr (Set Hash32))
negotiateKnownCausals BaseUrl
unisonShareUrl BranchRef
branchRef HashJWT
hashJwt
let hash :: Hash32
hash = HashJWT -> Hash32
Share.hashJWTHash HashJWT
hashJwt
Cli (Either SyncErr ()) -> ExceptT SyncErr Cli ()
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (Cli (Either SyncErr ()) -> ExceptT SyncErr Cli ())
-> Cli (Either SyncErr ()) -> ExceptT SyncErr Cli ()
forall a b. (a -> b) -> a -> b
$ do
(Transaction (Maybe EntityLocation) -> Cli (Maybe EntityLocation)
forall a. Transaction a -> Cli a
Cli.runTransaction (Hash32 -> Transaction (Maybe EntityLocation)
Q.entityLocation Hash32
hash)) Cli (Maybe EntityLocation)
-> (Maybe EntityLocation -> Cli (Either SyncErr ()))
-> Cli (Either SyncErr ())
forall a b. Cli a -> (a -> Cli b) -> Cli b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just EntityLocation
Q.EntityInMainStorage -> Either SyncErr () -> Cli (Either SyncErr ())
forall a. a -> Cli a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either SyncErr () -> Cli (Either SyncErr ()))
-> Either SyncErr () -> Cli (Either SyncErr ())
forall a b. (a -> b) -> a -> b
$ () -> Either SyncErr ()
forall a b. b -> Either a b
Right ()
Maybe EntityLocation
_ -> do
FilePath -> Cli (Either SyncErr ()) -> Cli (Either SyncErr ())
forall (m :: * -> *) a. MonadIO m => FilePath -> m a -> m a
Timing.time FilePath
"Entity Download" (Cli (Either SyncErr ()) -> Cli (Either SyncErr ()))
-> Cli (Either SyncErr ()) -> Cli (Either SyncErr ())
forall a b. (a -> b) -> a -> b
$ do
IO (Either SyncErr ()) -> Cli (Either SyncErr ())
forall a. IO a -> Cli a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either SyncErr ()) -> Cli (Either SyncErr ()))
-> (ExceptT SyncErr (ResourceT IO) () -> IO (Either SyncErr ()))
-> ExceptT SyncErr (ResourceT IO) ()
-> Cli (Either SyncErr ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ResourceT IO (Either SyncErr ()) -> IO (Either SyncErr ())
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
C.runResourceT (ResourceT IO (Either SyncErr ()) -> IO (Either SyncErr ()))
-> (ExceptT SyncErr (ResourceT IO) ()
-> ResourceT IO (Either SyncErr ()))
-> ExceptT SyncErr (ResourceT IO) ()
-> IO (Either SyncErr ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExceptT SyncErr (ResourceT IO) ()
-> ResourceT IO (Either SyncErr ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT SyncErr (ResourceT IO) () -> Cli (Either SyncErr ()))
-> ExceptT SyncErr (ResourceT IO) () -> Cli (Either SyncErr ())
forall a b. (a -> b) -> a -> b
$ AuthenticatedHttpClient
-> BaseUrl
-> DownloadEntitiesRequest
-> (StreamInitInfo
-> Stream () EntityChunk -> ExceptT SyncErr (ResourceT IO) ())
-> ExceptT SyncErr (ResourceT IO) ()
httpStreamEntities
AuthenticatedHttpClient
authHTTPClient
BaseUrl
unisonShareUrl
SyncV2.DownloadEntitiesRequest {BranchRef
branchRef :: BranchRef
$sel:branchRef:DownloadEntitiesRequest :: BranchRef
branchRef, $sel:causalHash:DownloadEntitiesRequest :: HashJWT
causalHash = HashJWT
hashJwt, Set Hash32
knownHashes :: Set Hash32
$sel:knownHashes:DownloadEntitiesRequest :: Set Hash32
knownHashes}
\StreamInitInfo
header Stream () EntityChunk
stream -> do
Bool
-> Codebase IO Symbol Ann
-> StreamInitInfo
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
forall v a.
Bool
-> Codebase IO v a
-> StreamInitInfo
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
streamIntoCodebase Bool
shouldValidate Codebase IO Symbol Ann
codebase StreamInitInfo
header Stream () EntityChunk
stream
(IO (Either SyncErr ()) -> Cli (Either SyncErr ()))
-> ExceptT SyncErr IO () -> ExceptT SyncErr Cli ()
forall (m :: * -> *) e a (n :: * -> *) e' b.
(m (Either e a) -> n (Either e' b))
-> ExceptT e m a -> ExceptT e' n b
mapExceptT IO (Either SyncErr ()) -> Cli (Either SyncErr ())
forall a. IO a -> Cli a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Codebase IO Symbol Ann -> Hash32 -> ExceptT SyncErr IO ()
forall v a. Codebase IO v a -> Hash32 -> ExceptT SyncErr IO ()
afterSyncChecks Codebase IO Symbol Ann
codebase Hash32
hash)
validateAndSave :: Bool -> (Codebase.Codebase IO v a) -> Vector (Hash32, TempEntity) -> StreamM ()
validateAndSave :: forall v a.
Bool
-> Codebase IO v a
-> Vector (Hash32, TempEntity)
-> ExceptT SyncErr (ResourceT IO) ()
validateAndSave Bool
shouldValidate Codebase IO v a
codebase Vector (Hash32, TempEntity)
entities = do
let validateEntities :: IO (Either SyncErr ())
validateEntities =
ExceptT SyncErr IO () -> IO (Either SyncErr ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT SyncErr IO () -> IO (Either SyncErr ()))
-> ExceptT SyncErr IO () -> IO (Either SyncErr ())
forall a b. (a -> b) -> a -> b
$ Bool -> ExceptT SyncErr IO () -> ExceptT SyncErr IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
shouldValidate (Vector (Hash32, TempEntity) -> ExceptT SyncErr IO ()
batchValidateEntities Vector (Hash32, TempEntity)
entities)
ResourceT IO (Either SyncErr ())
-> ExceptT SyncErr (ResourceT IO) ()
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (ResourceT IO (Either SyncErr ())
-> ExceptT SyncErr (ResourceT IO) ())
-> (IO (Either SyncErr ()) -> ResourceT IO (Either SyncErr ()))
-> IO (Either SyncErr ())
-> ExceptT SyncErr (ResourceT IO) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Either SyncErr ()) -> ResourceT IO (Either SyncErr ())
forall a. IO a -> ResourceT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either SyncErr ()) -> ExceptT SyncErr (ResourceT IO) ())
-> IO (Either SyncErr ()) -> ExceptT SyncErr (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ IO (Either SyncErr ())
-> (Async (Either SyncErr ()) -> IO (Either SyncErr ()))
-> IO (Either SyncErr ())
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
IO.withAsync IO (Either SyncErr ())
validateEntities \Async (Either SyncErr ())
validationTask -> do
FilePath -> IO (Either SyncErr ()) -> IO (Either SyncErr ())
forall (m :: * -> *) a. MonadIO m => FilePath -> m a -> m a
Timing.time FilePath
"Inserting entities" (IO (Either SyncErr ()) -> IO (Either SyncErr ()))
-> IO (Either SyncErr ()) -> IO (Either SyncErr ())
forall a b. (a -> b) -> a -> b
$ Codebase IO v a
-> ExceptT SyncErr Transaction () -> IO (Either SyncErr ())
forall (m :: * -> *) v a e b.
MonadIO m =>
Codebase m v a -> ExceptT e Transaction b -> m (Either e b)
Codebase.runTransactionExceptT Codebase IO v a
codebase do
Vector (Hash32, TempEntity)
-> ((Hash32, TempEntity) -> ExceptT SyncErr Transaction ())
-> ExceptT SyncErr Transaction ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Vector (Hash32, TempEntity)
entities \(Hash32
hash, TempEntity
entity) -> do
ExceptT SyncErr Transaction (Either CausalHashId ObjectId)
-> ExceptT SyncErr Transaction ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ExceptT SyncErr Transaction (Either CausalHashId ObjectId)
-> ExceptT SyncErr Transaction ())
-> (Transaction (Either CausalHashId ObjectId)
-> ExceptT SyncErr Transaction (Either CausalHashId ObjectId))
-> Transaction (Either CausalHashId ObjectId)
-> ExceptT SyncErr Transaction ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Transaction (Either CausalHashId ObjectId)
-> ExceptT SyncErr Transaction (Either CausalHashId ObjectId)
forall (m :: * -> *) a. Monad m => m a -> ExceptT SyncErr m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Transaction (Either CausalHashId ObjectId)
-> ExceptT SyncErr Transaction ())
-> Transaction (Either CausalHashId ObjectId)
-> ExceptT SyncErr Transaction ()
forall a b. (a -> b) -> a -> b
$ HashHandle
-> Hash32
-> TempEntity
-> Transaction (Either CausalHashId ObjectId)
Q.saveTempEntityInMain HashHandle
v2HashHandle Hash32
hash TempEntity
entity
Transaction (Either SyncErr ())
-> ExceptT SyncErr Transaction (Either SyncErr ())
forall (m :: * -> *) a. Monad m => m a -> ExceptT SyncErr m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (IO (Either SyncErr ()) -> Transaction (Either SyncErr ())
forall a. HasCallStack => IO a -> Transaction a
Sqlite.unsafeIO (Async (Either SyncErr ()) -> IO (Either SyncErr ())
forall (m :: * -> *) a. MonadIO m => Async a -> m a
IO.wait Async (Either SyncErr ())
validationTask)) ExceptT SyncErr Transaction (Either SyncErr ())
-> (Either SyncErr () -> ExceptT SyncErr Transaction ())
-> ExceptT SyncErr Transaction ()
forall a b.
ExceptT SyncErr Transaction a
-> (a -> ExceptT SyncErr Transaction b)
-> ExceptT SyncErr Transaction b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left SyncErr
err -> SyncErr -> ExceptT SyncErr Transaction ()
forall a. SyncErr -> ExceptT SyncErr Transaction a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError SyncErr
err
Right ()
_ -> () -> ExceptT SyncErr Transaction ()
forall a. a -> ExceptT SyncErr Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
batchValidateEntities :: Vector (Hash32, TempEntity) -> ExceptT SyncErr IO ()
batchValidateEntities :: Vector (Hash32, TempEntity) -> ExceptT SyncErr IO ()
batchValidateEntities Vector (Hash32, TempEntity)
entities = do
Vector EntityValidationError
mismatches <- (Vector (Maybe EntityValidationError)
-> Vector EntityValidationError)
-> ExceptT SyncErr IO (Vector (Maybe EntityValidationError))
-> ExceptT SyncErr IO (Vector EntityValidationError)
forall a b.
(a -> b) -> ExceptT SyncErr IO a -> ExceptT SyncErr IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Vector (Maybe EntityValidationError)
-> Vector EntityValidationError
forall a. Vector (Maybe a) -> Vector a
Vector.catMaybes (ExceptT SyncErr IO (Vector (Maybe EntityValidationError))
-> ExceptT SyncErr IO (Vector EntityValidationError))
-> ExceptT SyncErr IO (Vector (Maybe EntityValidationError))
-> ExceptT SyncErr IO (Vector EntityValidationError)
forall a b. (a -> b) -> a -> b
$ IO (Vector (Maybe EntityValidationError))
-> ExceptT SyncErr IO (Vector (Maybe EntityValidationError))
forall a. IO a -> ExceptT SyncErr IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Vector (Maybe EntityValidationError))
-> ExceptT SyncErr IO (Vector (Maybe EntityValidationError)))
-> IO (Vector (Maybe EntityValidationError))
-> ExceptT SyncErr IO (Vector (Maybe EntityValidationError))
forall a b. (a -> b) -> a -> b
$ Vector (Hash32, TempEntity)
-> ((Hash32, TempEntity) -> IO (Maybe EntityValidationError))
-> IO (Vector (Maybe EntityValidationError))
forall (m :: * -> *) (t :: * -> *) a b.
(MonadUnliftIO m, Traversable t) =>
t a -> (a -> m b) -> m (t b)
IO.pooledForConcurrently Vector (Hash32, TempEntity)
entities \(Hash32
hash, TempEntity
entity) -> do
Maybe EntityValidationError -> IO (Maybe EntityValidationError)
forall (m :: * -> *) a. MonadIO m => a -> m a
IO.evaluate (Maybe EntityValidationError -> IO (Maybe EntityValidationError))
-> Maybe EntityValidationError -> IO (Maybe EntityValidationError)
forall a b. (a -> b) -> a -> b
$ Hash32 -> TempEntity -> Maybe EntityValidationError
EV.validateTempEntity Hash32
hash TempEntity
entity
Vector EntityValidationError
-> (EntityValidationError -> ExceptT SyncErr IO ())
-> ExceptT SyncErr IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Vector EntityValidationError
mismatches \case
err :: EntityValidationError
err@(Share.EntityHashMismatch EntityType
et (Share.HashMismatchForEntity {Hash32
supplied :: Hash32
$sel:supplied:HashMismatchForEntity :: HashMismatchForEntity -> Hash32
supplied, Hash32
computed :: Hash32
$sel:computed:HashMismatchForEntity :: HashMismatchForEntity -> Hash32
computed})) ->
let expectedMismatches :: Map Hash32 Hash32
expectedMismatches = case EntityType
et of
EntityType
Share.TermComponentType -> Map Hash32 Hash32
expectedComponentHashMismatches
EntityType
Share.DeclComponentType -> Map Hash32 Hash32
expectedComponentHashMismatches
EntityType
Share.CausalType -> Map Hash32 Hash32
expectedCausalHashMismatches
EntityType
_ -> Map Hash32 Hash32
forall a. Monoid a => a
mempty
in case Hash32 -> Map Hash32 Hash32 -> Maybe Hash32
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Hash32
supplied Map Hash32 Hash32
expectedMismatches of
Just Hash32
expected
| Hash32
expected Hash32 -> Hash32 -> Bool
forall a. Eq a => a -> a -> Bool
== Hash32
computed -> () -> ExceptT SyncErr IO ()
forall a. a -> ExceptT SyncErr IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Maybe Hash32
_ -> do
SyncErr -> ExceptT SyncErr IO ()
forall a. SyncErr -> ExceptT SyncErr IO a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> ExceptT SyncErr IO ())
-> (EntityValidationError -> SyncErr)
-> EntityValidationError
-> ExceptT SyncErr IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (EntityValidationError -> PullError)
-> EntityValidationError
-> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DownloadEntitiesError -> PullError
SyncV2.PullError'DownloadEntities (DownloadEntitiesError -> PullError)
-> (EntityValidationError -> DownloadEntitiesError)
-> EntityValidationError
-> PullError
forall b c a. (b -> c) -> (a -> b) -> a -> c
. EntityValidationError -> DownloadEntitiesError
SyncV2.DownloadEntitiesEntityValidationFailure (EntityValidationError -> ExceptT SyncErr IO ())
-> EntityValidationError -> ExceptT SyncErr IO ()
forall a b. (a -> b) -> a -> b
$ EntityValidationError
err
EntityValidationError
err -> do
SyncErr -> ExceptT SyncErr IO ()
forall a. SyncErr -> ExceptT SyncErr IO a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> ExceptT SyncErr IO ())
-> (EntityValidationError -> SyncErr)
-> EntityValidationError
-> ExceptT SyncErr IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (EntityValidationError -> PullError)
-> EntityValidationError
-> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DownloadEntitiesError -> PullError
SyncV2.PullError'DownloadEntities (DownloadEntitiesError -> PullError)
-> (EntityValidationError -> DownloadEntitiesError)
-> EntityValidationError
-> PullError
forall b c a. (b -> c) -> (a -> b) -> a -> c
. EntityValidationError -> DownloadEntitiesError
SyncV2.DownloadEntitiesEntityValidationFailure (EntityValidationError -> ExceptT SyncErr IO ())
-> EntityValidationError -> ExceptT SyncErr IO ()
forall a b. (a -> b) -> a -> b
$ EntityValidationError
err
syncUnsortedStream ::
Bool ->
(Codebase.Codebase IO v a) ->
Stream () SyncV2.EntityChunk ->
StreamM ()
syncUnsortedStream :: forall v a.
Bool
-> Codebase IO v a
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
syncUnsortedStream Bool
shouldValidate Codebase IO v a
codebase Stream () EntityChunk
stream = do
Vector (Hash32, TempEntity)
allEntities <-
ConduitT
()
Void
(ExceptT SyncErr (ResourceT IO))
(Vector (Hash32, TempEntity))
-> ExceptT SyncErr (ResourceT IO) (Vector (Hash32, TempEntity))
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
C.runConduit (ConduitT
()
Void
(ExceptT SyncErr (ResourceT IO))
(Vector (Hash32, TempEntity))
-> ExceptT SyncErr (ResourceT IO) (Vector (Hash32, TempEntity)))
-> ConduitT
()
Void
(ExceptT SyncErr (ResourceT IO))
(Vector (Hash32, TempEntity))
-> ExceptT SyncErr (ResourceT IO) (Vector (Hash32, TempEntity))
forall a b. (a -> b) -> a -> b
$
Stream () EntityChunk
stream
Stream () EntityChunk
-> ConduitT
EntityChunk
Void
(ExceptT SyncErr (ResourceT IO))
(Vector (Hash32, TempEntity))
-> ConduitT
()
Void
(ExceptT SyncErr (ResourceT IO))
(Vector (Hash32, TempEntity))
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| Int
-> ConduitT
EntityChunk [EntityChunk] (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a. Monad m => Int -> ConduitT a [a] m ()
CL.chunksOf Int
batchSize
ConduitT
EntityChunk [EntityChunk] (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT
[EntityChunk]
Void
(ExceptT SyncErr (ResourceT IO))
(Vector (Hash32, TempEntity))
-> ConduitT
EntityChunk
Void
(ExceptT SyncErr (ResourceT IO))
(Vector (Hash32, TempEntity))
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| Codebase IO v a
-> Stream [EntityChunk] (Vector (Hash32, TempEntity))
forall v a.
Codebase IO v a
-> Stream [EntityChunk] (Vector (Hash32, TempEntity))
unpackChunks Codebase IO v a
codebase
Stream [EntityChunk] (Vector (Hash32, TempEntity))
-> ConduitT
(Vector (Hash32, TempEntity))
Void
(ExceptT SyncErr (ResourceT IO))
(Vector (Hash32, TempEntity))
-> ConduitT
[EntityChunk]
Void
(ExceptT SyncErr (ResourceT IO))
(Vector (Hash32, TempEntity))
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| Stream (Vector (Hash32, TempEntity)) (Vector (Hash32, TempEntity))
validateBatch
Stream (Vector (Hash32, TempEntity)) (Vector (Hash32, TempEntity))
-> ConduitT
(Vector (Hash32, TempEntity))
Void
(ExceptT SyncErr (ResourceT IO))
(Vector (Hash32, TempEntity))
-> ConduitT
(Vector (Hash32, TempEntity))
Void
(ExceptT SyncErr (ResourceT IO))
(Vector (Hash32, TempEntity))
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| ConduitT
(Vector (Hash32, TempEntity))
(Hash32, TempEntity)
(ExceptT SyncErr (ResourceT IO))
()
ConduitT
(Vector (Hash32, TempEntity))
(Element (Vector (Hash32, TempEntity)))
(ExceptT SyncErr (ResourceT IO))
()
forall (m :: * -> *) mono.
(Monad m, MonoFoldable mono) =>
ConduitT mono (Element mono) m ()
C.concat
ConduitT
(Vector (Hash32, TempEntity))
(Hash32, TempEntity)
(ExceptT SyncErr (ResourceT IO))
()
-> ConduitT
(Hash32, TempEntity)
Void
(ExceptT SyncErr (ResourceT IO))
(Vector (Hash32, TempEntity))
-> ConduitT
(Vector (Hash32, TempEntity))
Void
(ExceptT SyncErr (ResourceT IO))
(Vector (Hash32, TempEntity))
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| forall (v :: * -> *) a (m :: * -> *) o.
(Vector v a, PrimMonad m) =>
ConduitT a o m (v a)
C.sinkVector @Vector
let sortedEntities :: [(Hash32, TempEntity)]
sortedEntities = Vector (Hash32, TempEntity) -> [(Hash32, TempEntity)]
forall (f :: * -> *).
(Foldable f, Functor f) =>
f (Hash32, TempEntity) -> [(Hash32, TempEntity)]
sortDependencyFirst Vector (Hash32, TempEntity)
allEntities
IO () -> ExceptT SyncErr (ResourceT IO) ()
forall a. IO a -> ExceptT SyncErr (ResourceT IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT SyncErr (ResourceT IO) ())
-> IO () -> ExceptT SyncErr (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ Maybe Int -> ((Int -> IO ()) -> IO ()) -> IO ()
forall (m :: * -> *) a.
MonadUnliftIO m =>
Maybe Int -> ((Int -> m ()) -> m a) -> m a
withEntitySavingCallback (Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$ Vector (Hash32, TempEntity) -> Int
forall a. Vector a -> Int
Vector.length Vector (Hash32, TempEntity)
allEntities) \Int -> IO ()
countC -> do
Codebase IO v a -> Transaction () -> IO ()
forall (m :: * -> *) v a b.
MonadIO m =>
Codebase m v a -> Transaction b -> m b
Codebase.runTransaction Codebase IO v a
codebase (Transaction () -> IO ()) -> Transaction () -> IO ()
forall a b. (a -> b) -> a -> b
$ [(Hash32, TempEntity)]
-> ((Hash32, TempEntity)
-> Transaction (Either CausalHashId ObjectId))
-> Transaction ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [(Hash32, TempEntity)]
sortedEntities \(Hash32
hash, TempEntity
entity) -> do
Either CausalHashId ObjectId
r <- HashHandle
-> Hash32
-> TempEntity
-> Transaction (Either CausalHashId ObjectId)
Q.saveTempEntityInMain HashHandle
v2HashHandle Hash32
hash TempEntity
entity
IO () -> Transaction ()
forall a. HasCallStack => IO a -> Transaction a
Sqlite.unsafeIO (IO () -> Transaction ()) -> IO () -> Transaction ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
countC Int
1
pure Either CausalHashId ObjectId
r
where
validateBatch :: Stream (Vector (Hash32, TempEntity)) (Vector (Hash32, TempEntity))
validateBatch :: Stream (Vector (Hash32, TempEntity)) (Vector (Hash32, TempEntity))
validateBatch = (Vector (Hash32, TempEntity) -> ExceptT SyncErr (ResourceT IO) ())
-> Stream
(Vector (Hash32, TempEntity)) (Vector (Hash32, TempEntity))
forall (m :: * -> *) a. Monad m => (a -> m ()) -> ConduitT a a m ()
C.iterM \Vector (Hash32, TempEntity)
entities -> do
Bool
-> ExceptT SyncErr (ResourceT IO) ()
-> ExceptT SyncErr (ResourceT IO) ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
shouldValidate ((IO (Either SyncErr ()) -> ResourceT IO (Either SyncErr ()))
-> ExceptT SyncErr IO () -> ExceptT SyncErr (ResourceT IO) ()
forall (m :: * -> *) e a (n :: * -> *) e' b.
(m (Either e a) -> n (Either e' b))
-> ExceptT e m a -> ExceptT e' n b
mapExceptT IO (Either SyncErr ()) -> ResourceT IO (Either SyncErr ())
forall (m :: * -> *) a. Monad m => m a -> ResourceT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ExceptT SyncErr IO () -> ExceptT SyncErr (ResourceT IO) ())
-> ExceptT SyncErr IO () -> ExceptT SyncErr (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ Vector (Hash32, TempEntity) -> ExceptT SyncErr IO ()
batchValidateEntities Vector (Hash32, TempEntity)
entities)
syncSortedStream ::
Bool ->
(Codebase.Codebase IO v a) ->
Stream () SyncV2.EntityChunk ->
StreamM ()
syncSortedStream :: forall v a.
Bool
-> Codebase IO v a
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
syncSortedStream Bool
shouldValidate Codebase IO v a
codebase Stream () EntityChunk
stream = do
let handler :: Stream (Vector (Hash32, TempEntity)) o
handler :: forall o. Stream (Vector (Hash32, TempEntity)) o
handler = (Vector (Hash32, TempEntity) -> ExceptT SyncErr (ResourceT IO) ())
-> ConduitT
(Vector (Hash32, TempEntity)) o (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a o.
Monad m =>
(a -> m ()) -> ConduitT a o m ()
C.mapM_C \Vector (Hash32, TempEntity)
entityBatch -> do
Bool
-> Codebase IO v a
-> Vector (Hash32, TempEntity)
-> ExceptT SyncErr (ResourceT IO) ()
forall v a.
Bool
-> Codebase IO v a
-> Vector (Hash32, TempEntity)
-> ExceptT SyncErr (ResourceT IO) ()
validateAndSave Bool
shouldValidate Codebase IO v a
codebase Vector (Hash32, TempEntity)
entityBatch
ConduitT () Void (ExceptT SyncErr (ResourceT IO)) ()
-> ExceptT SyncErr (ResourceT IO) ()
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
C.runConduit (ConduitT () Void (ExceptT SyncErr (ResourceT IO)) ()
-> ExceptT SyncErr (ResourceT IO) ())
-> ConduitT () Void (ExceptT SyncErr (ResourceT IO)) ()
-> ExceptT SyncErr (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$
Stream () EntityChunk
stream
Stream () EntityChunk
-> ConduitT EntityChunk Void (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT () Void (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| Int
-> ConduitT
EntityChunk [EntityChunk] (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a. Monad m => Int -> ConduitT a [a] m ()
CL.chunksOf Int
batchSize
ConduitT
EntityChunk [EntityChunk] (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT [EntityChunk] Void (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT EntityChunk Void (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| Codebase IO v a
-> Stream [EntityChunk] (Vector (Hash32, TempEntity))
forall v a.
Codebase IO v a
-> Stream [EntityChunk] (Vector (Hash32, TempEntity))
unpackChunks Codebase IO v a
codebase
Stream [EntityChunk] (Vector (Hash32, TempEntity))
-> ConduitT
(Vector (Hash32, TempEntity))
Void
(ExceptT SyncErr (ResourceT IO))
()
-> ConduitT [EntityChunk] Void (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| ConduitT
(Vector (Hash32, TempEntity))
Void
(ExceptT SyncErr (ResourceT IO))
()
forall o. Stream (Vector (Hash32, TempEntity)) o
handler
sortDependencyFirst :: (Foldable f, Functor f) => f (Hash32, TempEntity) -> [(Hash32, TempEntity)]
sortDependencyFirst :: forall (f :: * -> *).
(Foldable f, Functor f) =>
f (Hash32, TempEntity) -> [(Hash32, TempEntity)]
sortDependencyFirst f (Hash32, TempEntity)
entities = do
let adjList :: f ((Hash32, TempEntity), Hash32, [Hash32])
adjList = f (Hash32, TempEntity)
entities f (Hash32, TempEntity)
-> ((Hash32, TempEntity)
-> ((Hash32, TempEntity), Hash32, [Hash32]))
-> f ((Hash32, TempEntity), Hash32, [Hash32])
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \(Hash32
hash32, TempEntity
entity) -> ((Hash32
hash32, TempEntity
entity), Hash32
hash32, Set Hash32 -> [Hash32]
forall a. Set a -> [a]
Set.toList (Set Hash32 -> [Hash32]) -> Set Hash32 -> [Hash32]
forall a b. (a -> b) -> a -> b
$ Entity Text Hash32 Hash32 -> Set Hash32
forall hash text noSyncHash.
Ord hash =>
Entity text noSyncHash hash -> Set hash
Share.entityDependencies (TempEntity -> Entity Text Hash32 Hash32
tempEntityToEntity TempEntity
entity))
(Graph
graph, Int -> ((Hash32, TempEntity), Hash32, [Hash32])
vertexInfo, Hash32 -> Maybe Int
_vertexForKey) = [((Hash32, TempEntity), Hash32, [Hash32])]
-> (Graph, Int -> ((Hash32, TempEntity), Hash32, [Hash32]),
Hash32 -> Maybe Int)
forall key node.
Ord key =>
[(node, key, [key])]
-> (Graph, Int -> (node, key, [key]), key -> Maybe Int)
Graph.graphFromEdges (f ((Hash32, TempEntity), Hash32, [Hash32])
-> [((Hash32, TempEntity), Hash32, [Hash32])]
forall a. f a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
Foldable.toList f ((Hash32, TempEntity), Hash32, [Hash32])
adjList)
in Graph -> [Int]
Graph.reverseTopSort Graph
graph [Int] -> (Int -> (Hash32, TempEntity)) -> [(Hash32, TempEntity)]
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \Int
v -> (Getting
(Hash32, TempEntity)
((Hash32, TempEntity), Hash32, [Hash32])
(Hash32, TempEntity)
-> ((Hash32, TempEntity), Hash32, [Hash32]) -> (Hash32, TempEntity)
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting
(Hash32, TempEntity)
((Hash32, TempEntity), Hash32, [Hash32])
(Hash32, TempEntity)
forall s t a b. Field1 s t a b => Lens s t a b
Lens
((Hash32, TempEntity), Hash32, [Hash32])
((Hash32, TempEntity), Hash32, [Hash32])
(Hash32, TempEntity)
(Hash32, TempEntity)
_1 (((Hash32, TempEntity), Hash32, [Hash32]) -> (Hash32, TempEntity))
-> ((Hash32, TempEntity), Hash32, [Hash32]) -> (Hash32, TempEntity)
forall a b. (a -> b) -> a -> b
$ Int -> ((Hash32, TempEntity), Hash32, [Hash32])
vertexInfo Int
v)
unpackChunk :: SyncV2.EntityChunk -> ExceptT SyncErr Sqlite.Transaction (Maybe (Hash32, TempEntity))
unpackChunk :: EntityChunk
-> ExceptT SyncErr Transaction (Maybe (Hash32, TempEntity))
unpackChunk = \case
SyncV2.EntityChunk {Hash32
hash :: Hash32
$sel:hash:EntityChunk :: EntityChunk -> Hash32
hash, $sel:entityCBOR:EntityChunk :: EntityChunk -> CBORBytes TempEntity
entityCBOR = CBORBytes TempEntity
entityBytes} -> do
Transaction (Maybe EntityLocation)
-> ExceptT SyncErr Transaction (Maybe EntityLocation)
forall (m :: * -> *) a. Monad m => m a -> ExceptT SyncErr m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Hash32 -> Transaction (Maybe EntityLocation)
Q.entityLocation Hash32
hash) ExceptT SyncErr Transaction (Maybe EntityLocation)
-> (Maybe EntityLocation
-> ExceptT SyncErr Transaction (Maybe (Hash32, TempEntity)))
-> ExceptT SyncErr Transaction (Maybe (Hash32, TempEntity))
forall a b.
ExceptT SyncErr Transaction a
-> (a -> ExceptT SyncErr Transaction b)
-> ExceptT SyncErr Transaction b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just EntityLocation
Q.EntityInMainStorage -> Maybe (Hash32, TempEntity)
-> ExceptT SyncErr Transaction (Maybe (Hash32, TempEntity))
forall a. a -> ExceptT SyncErr Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Hash32, TempEntity)
forall a. Maybe a
Nothing
Maybe EntityLocation
_ -> do
((Hash32, TempEntity) -> Maybe (Hash32, TempEntity)
forall a. a -> Maybe a
Just ((Hash32, TempEntity) -> Maybe (Hash32, TempEntity))
-> (TempEntity -> (Hash32, TempEntity))
-> TempEntity
-> Maybe (Hash32, TempEntity)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Hash32
hash,)) (TempEntity -> Maybe (Hash32, TempEntity))
-> ExceptT SyncErr Transaction TempEntity
-> ExceptT SyncErr Transaction (Maybe (Hash32, TempEntity))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> CBORBytes TempEntity -> ExceptT SyncErr Transaction TempEntity
unpackEntity CBORBytes TempEntity
entityBytes
where
unpackEntity :: (CBORBytes TempEntity) -> ExceptT SyncErr Sqlite.Transaction TempEntity
unpackEntity :: CBORBytes TempEntity -> ExceptT SyncErr Transaction TempEntity
unpackEntity CBORBytes TempEntity
entityBytes = do
case CBORBytes TempEntity -> Either DeserialiseFailure TempEntity
forall t. Serialise t => CBORBytes t -> Either DeserialiseFailure t
CBOR.deserialiseOrFailCBORBytes CBORBytes TempEntity
entityBytes of
Left DeserialiseFailure
err -> do SyncErr -> ExceptT SyncErr Transaction TempEntity
forall a. SyncErr -> ExceptT SyncErr Transaction a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> ExceptT SyncErr Transaction TempEntity)
-> SyncErr -> ExceptT SyncErr Transaction TempEntity
forall a b. (a -> b) -> a -> b
$ (PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (SyncError -> PullError) -> SyncError -> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError -> PullError
SyncV2.PullError'Sync (SyncError -> SyncErr) -> SyncError -> SyncErr
forall a b. (a -> b) -> a -> b
$ DeserialiseFailure -> SyncError
SyncV2.SyncErrorDeserializationFailure DeserialiseFailure
err)
Right TempEntity
entity -> TempEntity -> ExceptT SyncErr Transaction TempEntity
forall a. a -> ExceptT SyncErr Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TempEntity
entity
unpackChunks :: Codebase.Codebase IO v a -> Stream [SyncV2.EntityChunk] (Vector (Hash32, TempEntity))
unpackChunks :: forall v a.
Codebase IO v a
-> Stream [EntityChunk] (Vector (Hash32, TempEntity))
unpackChunks Codebase IO v a
codebase = ([EntityChunk]
-> ExceptT SyncErr (ResourceT IO) (Vector (Hash32, TempEntity)))
-> Stream [EntityChunk] (Vector (Hash32, TempEntity))
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> ConduitT a b m ()
C.mapM \[EntityChunk]
xs -> ResourceT IO (Either SyncErr (Vector (Hash32, TempEntity)))
-> ExceptT SyncErr (ResourceT IO) (Vector (Hash32, TempEntity))
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (ResourceT IO (Either SyncErr (Vector (Hash32, TempEntity)))
-> ExceptT SyncErr (ResourceT IO) (Vector (Hash32, TempEntity)))
-> (ExceptT SyncErr Transaction (Vector (Hash32, TempEntity))
-> ResourceT IO (Either SyncErr (Vector (Hash32, TempEntity))))
-> ExceptT SyncErr Transaction (Vector (Hash32, TempEntity))
-> ExceptT SyncErr (ResourceT IO) (Vector (Hash32, TempEntity))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Either SyncErr (Vector (Hash32, TempEntity)))
-> ResourceT IO (Either SyncErr (Vector (Hash32, TempEntity)))
forall (m :: * -> *) a. Monad m => m a -> ResourceT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (IO (Either SyncErr (Vector (Hash32, TempEntity)))
-> ResourceT IO (Either SyncErr (Vector (Hash32, TempEntity))))
-> (ExceptT SyncErr Transaction (Vector (Hash32, TempEntity))
-> IO (Either SyncErr (Vector (Hash32, TempEntity))))
-> ExceptT SyncErr Transaction (Vector (Hash32, TempEntity))
-> ResourceT IO (Either SyncErr (Vector (Hash32, TempEntity)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Codebase IO v a
-> ExceptT SyncErr Transaction (Vector (Hash32, TempEntity))
-> IO (Either SyncErr (Vector (Hash32, TempEntity)))
forall (m :: * -> *) v a e b.
MonadIO m =>
Codebase m v a -> ExceptT e Transaction b -> m (Either e b)
Codebase.runTransactionExceptT Codebase IO v a
codebase (ExceptT SyncErr Transaction (Vector (Hash32, TempEntity))
-> ExceptT SyncErr (ResourceT IO) (Vector (Hash32, TempEntity)))
-> ExceptT SyncErr Transaction (Vector (Hash32, TempEntity))
-> ExceptT SyncErr (ResourceT IO) (Vector (Hash32, TempEntity))
forall a b. (a -> b) -> a -> b
$ do
[EntityChunk]
-> (EntityChunk
-> ExceptT SyncErr Transaction (Maybe (Hash32, TempEntity)))
-> ExceptT SyncErr Transaction [Maybe (Hash32, TempEntity)]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for [EntityChunk]
xs EntityChunk
-> ExceptT SyncErr Transaction (Maybe (Hash32, TempEntity))
unpackChunk
ExceptT SyncErr Transaction [Maybe (Hash32, TempEntity)]
-> ([Maybe (Hash32, TempEntity)] -> [(Hash32, TempEntity)])
-> ExceptT SyncErr Transaction [(Hash32, TempEntity)]
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> [Maybe (Hash32, TempEntity)] -> [(Hash32, TempEntity)]
forall a. [Maybe a] -> [a]
catMaybes
ExceptT SyncErr Transaction [(Hash32, TempEntity)]
-> ([(Hash32, TempEntity)] -> Vector (Hash32, TempEntity))
-> ExceptT SyncErr Transaction (Vector (Hash32, TempEntity))
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> [(Hash32, TempEntity)] -> Vector (Hash32, TempEntity)
forall a. [a] -> Vector a
Vector.fromList
streamIntoCodebase ::
Bool ->
Codebase.Codebase IO v a ->
SyncV2.StreamInitInfo ->
Stream () SyncV2.EntityChunk ->
StreamM ()
streamIntoCodebase :: forall v a.
Bool
-> Codebase IO v a
-> StreamInitInfo
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
streamIntoCodebase Bool
shouldValidate Codebase IO v a
codebase SyncV2.StreamInitInfo {Version
version :: Version
$sel:version:StreamInitInfo :: StreamInitInfo -> Version
version, EntitySorting
entitySorting :: EntitySorting
$sel:entitySorting:StreamInitInfo :: StreamInitInfo -> EntitySorting
entitySorting, $sel:numEntities:StreamInitInfo :: StreamInitInfo -> Maybe Word64
numEntities = Maybe Word64
numEntities} Stream () EntityChunk
stream = ResourceT IO (Either SyncErr ())
-> ExceptT SyncErr (ResourceT IO) ()
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT do
Maybe Int
-> (ConduitT
EntityChunk EntityChunk (ExceptT SyncErr (ResourceT IO)) ()
-> ResourceT IO (Either SyncErr ()))
-> ResourceT IO (Either SyncErr ())
forall (m :: * -> *) (n :: * -> *) i a.
(MonadIO m, MonadUnliftIO n) =>
Maybe Int -> (ConduitT i i m () -> n a) -> n a
withStreamProgressCallback (Word64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word64 -> Int) -> Maybe Word64 -> Maybe Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe Word64
numEntities) \ConduitT
EntityChunk EntityChunk (ExceptT SyncErr (ResourceT IO)) ()
countC -> ExceptT SyncErr (ResourceT IO) ()
-> ResourceT IO (Either SyncErr ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT do
let stream' :: Stream () EntityChunk
stream' = Stream () EntityChunk
stream Stream () EntityChunk
-> ConduitT
EntityChunk EntityChunk (ExceptT SyncErr (ResourceT IO)) ()
-> Stream () EntityChunk
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| ConduitT
EntityChunk EntityChunk (ExceptT SyncErr (ResourceT IO)) ()
countC
case Version
version of
(SyncV2.Version Word16
1) -> () -> ExceptT SyncErr (ResourceT IO) ()
forall a. a -> ExceptT SyncErr (ResourceT IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Version
v -> SyncErr -> ExceptT SyncErr (ResourceT IO) ()
forall a. SyncErr -> ExceptT SyncErr (ResourceT IO) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> ExceptT SyncErr (ResourceT IO) ())
-> (SyncError -> SyncErr)
-> SyncError
-> ExceptT SyncErr (ResourceT IO) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (SyncError -> PullError) -> SyncError -> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError -> PullError
SyncV2.PullError'Sync (SyncError -> ExceptT SyncErr (ResourceT IO) ())
-> SyncError -> ExceptT SyncErr (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ Version -> SyncError
SyncV2.SyncErrorUnsupportedVersion Version
v
case EntitySorting
entitySorting of
EntitySorting
SyncV2.DependenciesFirst -> Bool
-> Codebase IO v a
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
forall v a.
Bool
-> Codebase IO v a
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
syncSortedStream Bool
shouldValidate Codebase IO v a
codebase Stream () EntityChunk
stream'
EntitySorting
SyncV2.Unsorted -> Bool
-> Codebase IO v a
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
forall v a.
Bool
-> Codebase IO v a
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
syncUnsortedStream Bool
shouldValidate Codebase IO v a
codebase Stream () EntityChunk
stream'
afterSyncChecks :: Codebase.Codebase IO v a -> Hash32 -> ExceptT (SyncError SyncV2.PullError) IO ()
afterSyncChecks :: forall v a. Codebase IO v a -> Hash32 -> ExceptT SyncErr IO ()
afterSyncChecks Codebase IO v a
codebase Hash32
hash = do
IO Bool -> ExceptT SyncErr IO Bool
forall (m :: * -> *) a. Monad m => m a -> ExceptT SyncErr m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Codebase IO v a -> Hash32 -> IO Bool
forall v a. Codebase IO v a -> Hash32 -> IO Bool
didCausalSuccessfullyImport Codebase IO v a
codebase Hash32
hash) ExceptT SyncErr IO Bool
-> (Bool -> ExceptT SyncErr IO ()) -> ExceptT SyncErr IO ()
forall a b.
ExceptT SyncErr IO a
-> (a -> ExceptT SyncErr IO b) -> ExceptT SyncErr IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Bool
False -> do
SyncErr -> ExceptT SyncErr IO ()
forall a. SyncErr -> ExceptT SyncErr IO a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (PullError -> SyncErr
forall e. e -> SyncError e
SyncError (SyncError -> PullError
SyncV2.PullError'Sync (SyncError -> PullError)
-> (Hash32 -> SyncError) -> Hash32 -> PullError
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CausalHash -> SyncError
SyncV2.SyncErrorExpectedResultNotInMain (CausalHash -> SyncError)
-> (Hash32 -> CausalHash) -> Hash32 -> SyncError
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Hash32 -> CausalHash
hash32ToCausalHash (Hash32 -> PullError) -> Hash32 -> PullError
forall a b. (a -> b) -> a -> b
$ Hash32
hash))
Bool
True -> () -> ExceptT SyncErr IO ()
forall a. a -> ExceptT SyncErr IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
ExceptT SyncErr IO Bool -> ExceptT SyncErr IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ExceptT SyncErr IO Bool -> ExceptT SyncErr IO ())
-> ExceptT SyncErr IO Bool -> ExceptT SyncErr IO ()
forall a b. (a -> b) -> a -> b
$ IO Bool -> ExceptT SyncErr IO Bool
forall a. IO a -> ExceptT SyncErr IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Codebase IO v a -> forall x. (Connection -> IO x) -> IO x
forall (m :: * -> *) v a.
Codebase m v a -> forall x. (Connection -> m x) -> m x
Codebase.withConnection Codebase IO v a
codebase Connection -> IO Bool
Sqlite.vacuum)
where
didCausalSuccessfullyImport :: Codebase.Codebase IO v a -> Hash32 -> IO Bool
didCausalSuccessfullyImport :: forall v a. Codebase IO v a -> Hash32 -> IO Bool
didCausalSuccessfullyImport Codebase IO v a
codebase Hash32
hash = do
let expectedHash :: CausalHash
expectedHash = Hash32 -> CausalHash
hash32ToCausalHash Hash32
hash
Maybe (CausalHashId, BranchHashId) -> Bool
forall a. Maybe a -> Bool
isJust (Maybe (CausalHashId, BranchHashId) -> Bool)
-> IO (Maybe (CausalHashId, BranchHashId)) -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Codebase IO v a
-> Transaction (Maybe (CausalHashId, BranchHashId))
-> IO (Maybe (CausalHashId, BranchHashId))
forall (m :: * -> *) v a b.
MonadIO m =>
Codebase m v a -> Transaction b -> m b
Codebase.runTransaction Codebase IO v a
codebase (Transaction (Maybe (CausalHashId, BranchHashId))
-> IO (Maybe (CausalHashId, BranchHashId)))
-> Transaction (Maybe (CausalHashId, BranchHashId))
-> IO (Maybe (CausalHashId, BranchHashId))
forall a b. (a -> b) -> a -> b
$ CausalHash -> Transaction (Maybe (CausalHashId, BranchHashId))
Q.loadCausalByCausalHash CausalHash
expectedHash)
withCodebaseEntityStream ::
(MonadIO m) =>
Sqlite.Connection ->
CausalHash ->
Maybe SyncV2.BranchRef ->
(Int -> Stream () SyncV2.DownloadEntitiesChunk -> m r) ->
m r
withCodebaseEntityStream :: forall (m :: * -> *) r.
MonadIO m =>
Connection
-> CausalHash
-> Maybe BranchRef
-> (Int -> Stream () DownloadEntitiesChunk -> m r)
-> m r
withCodebaseEntityStream Connection
conn CausalHash
rootHash Maybe BranchRef
mayBranchRef Int -> Stream () DownloadEntitiesChunk -> m r
callback = do
Map Hash32 (Entity Text Hash32 Hash32)
entities <- IO (Map Hash32 (Entity Text Hash32 Hash32))
-> m (Map Hash32 (Entity Text Hash32 Hash32))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Map Hash32 (Entity Text Hash32 Hash32))
-> m (Map Hash32 (Entity Text Hash32 Hash32)))
-> IO (Map Hash32 (Entity Text Hash32 Hash32))
-> m (Map Hash32 (Entity Text Hash32 Hash32))
forall a b. (a -> b) -> a -> b
$ ((Int -> IO ()) -> IO (Map Hash32 (Entity Text Hash32 Hash32)))
-> IO (Map Hash32 (Entity Text Hash32 Hash32))
forall (m :: * -> *) a.
MonadUnliftIO m =>
((Int -> m ()) -> m a) -> m a
withEntityLoadingCallback (((Int -> IO ()) -> IO (Map Hash32 (Entity Text Hash32 Hash32)))
-> IO (Map Hash32 (Entity Text Hash32 Hash32)))
-> ((Int -> IO ()) -> IO (Map Hash32 (Entity Text Hash32 Hash32)))
-> IO (Map Hash32 (Entity Text Hash32 Hash32))
forall a b. (a -> b) -> a -> b
$ \Int -> IO ()
counter -> do
Connection
-> Transaction (Map Hash32 (Entity Text Hash32 Hash32))
-> IO (Map Hash32 (Entity Text Hash32 Hash32))
forall (m :: * -> *) a.
(MonadIO m, HasCallStack) =>
Connection -> Transaction a -> m a
Sqlite.runTransaction Connection
conn (CausalHash
-> (Int -> IO ())
-> Transaction (Map Hash32 (Entity Text Hash32 Hash32))
depsForCausal CausalHash
rootHash Int -> IO ()
counter)
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Handle -> Text -> IO ()
Text.hPutStrLn Handle
IO.stderr (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"Finished loading entities, writing sync-file."
let totalEntities :: Int
totalEntities = Int -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$ Map Hash32 (Entity Text Hash32 Hash32) -> Int
forall k a. Map k a -> Int
Map.size Map Hash32 (Entity Text Hash32 Hash32)
entities
let initialChunk :: DownloadEntitiesChunk
initialChunk =
StreamInitInfo -> DownloadEntitiesChunk
SyncV2.InitialC
( SyncV2.StreamInitInfo
{ $sel:rootCausalHash:StreamInitInfo :: Hash32
rootCausalHash = CausalHash -> Hash32
causalHashToHash32 CausalHash
rootHash,
$sel:version:StreamInitInfo :: Version
version = Word16 -> Version
SyncV2.Version Word16
1,
$sel:entitySorting:StreamInitInfo :: EntitySorting
entitySorting = EntitySorting
SyncV2.DependenciesFirst,
$sel:numEntities:StreamInitInfo :: Maybe Word64
numEntities = Word64 -> Maybe Word64
forall a. a -> Maybe a
Just (Word64 -> Maybe Word64) -> Word64 -> Maybe Word64
forall a b. (a -> b) -> a -> b
$ Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
totalEntities,
$sel:rootBranchRef:StreamInitInfo :: Maybe BranchRef
rootBranchRef = Maybe BranchRef
mayBranchRef
}
)
let contents :: [DownloadEntitiesChunk]
contents =
Map Hash32 (Entity Text Hash32 Hash32)
entities
Map Hash32 (Entity Text Hash32 Hash32)
-> (Map Hash32 (Entity Text Hash32 Hash32)
-> Map Hash32 TempEntity)
-> Map Hash32 TempEntity
forall a b. a -> (a -> b) -> b
& (Entity Text Hash32 Hash32 -> TempEntity)
-> Map Hash32 (Entity Text Hash32 Hash32) -> Map Hash32 TempEntity
forall a b. (a -> b) -> Map Hash32 a -> Map Hash32 b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((Hash32 -> Hash32) -> Entity Text Hash32 Hash32 -> TempEntity
forall hash.
(hash -> Hash32) -> Entity Text Hash32 hash -> TempEntity
Sync.entityToTempEntity Hash32 -> Hash32
forall a. a -> a
id)
Map Hash32 TempEntity
-> (Map Hash32 TempEntity -> [(Hash32, TempEntity)])
-> [(Hash32, TempEntity)]
forall a b. a -> (a -> b) -> b
& Map Hash32 TempEntity -> [(Hash32, TempEntity)]
forall k a. Map k a -> [(k, a)]
Map.toList
[(Hash32, TempEntity)]
-> ([(Hash32, TempEntity)] -> [(Hash32, TempEntity)])
-> [(Hash32, TempEntity)]
forall a b. a -> (a -> b) -> b
& [(Hash32, TempEntity)] -> [(Hash32, TempEntity)]
forall (f :: * -> *).
(Foldable f, Functor f) =>
f (Hash32, TempEntity) -> [(Hash32, TempEntity)]
sortDependencyFirst
[(Hash32, TempEntity)]
-> ([(Hash32, TempEntity)] -> [DownloadEntitiesChunk])
-> [DownloadEntitiesChunk]
forall a b. a -> (a -> b) -> b
& ( ((Hash32, TempEntity) -> DownloadEntitiesChunk)
-> [(Hash32, TempEntity)] -> [DownloadEntitiesChunk]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap \(Hash32
hash, TempEntity
entity) ->
let entityCBOR :: CBORBytes TempEntity
entityCBOR = (TempEntity -> CBORBytes TempEntity
forall t. Serialise t => t -> CBORBytes t
CBOR.serialiseCBORBytes TempEntity
entity)
in EntityChunk -> DownloadEntitiesChunk
SyncV2.EntityC (SyncV2.EntityChunk {Hash32
$sel:hash:EntityChunk :: Hash32
hash :: Hash32
hash, CBORBytes TempEntity
$sel:entityCBOR:EntityChunk :: CBORBytes TempEntity
entityCBOR :: CBORBytes TempEntity
entityCBOR})
)
[DownloadEntitiesChunk]
-> ([DownloadEntitiesChunk] -> [DownloadEntitiesChunk])
-> [DownloadEntitiesChunk]
forall a b. a -> (a -> b) -> b
& (DownloadEntitiesChunk
initialChunk DownloadEntitiesChunk
-> [DownloadEntitiesChunk] -> [DownloadEntitiesChunk]
forall a. a -> [a] -> [a]
:)
let stream :: ConduitT
()
(Element [DownloadEntitiesChunk])
(ExceptT SyncErr (ResourceT IO))
()
stream = [DownloadEntitiesChunk]
-> ConduitT
()
(Element [DownloadEntitiesChunk])
(ExceptT SyncErr (ResourceT IO))
()
forall (m :: * -> *) mono i.
(Monad m, MonoFoldable mono) =>
mono -> ConduitT i (Element mono) m ()
C.yieldMany [DownloadEntitiesChunk]
contents
Int -> Stream () DownloadEntitiesChunk -> m r
callback Int
totalEntities Stream () DownloadEntitiesChunk
stream
where
depsForCausal :: CausalHash -> (Int -> IO ()) -> Sqlite.Transaction (Map Hash32 (Sync.Entity Text Hash32 Hash32))
depsForCausal :: CausalHash
-> (Int -> IO ())
-> Transaction (Map Hash32 (Entity Text Hash32 Hash32))
depsForCausal CausalHash
causalHash Int -> IO ()
counter = do
(StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
-> Map Hash32 (Entity Text Hash32 Hash32)
-> Transaction (Map Hash32 (Entity Text Hash32 Hash32)))
-> Map Hash32 (Entity Text Hash32 Hash32)
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
-> Transaction (Map Hash32 (Entity Text Hash32 Hash32))
forall a b c. (a -> b -> c) -> b -> a -> c
flip StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
-> Map Hash32 (Entity Text Hash32 Hash32)
-> Transaction (Map Hash32 (Entity Text Hash32 Hash32))
forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m s
execStateT Map Hash32 (Entity Text Hash32 Hash32)
forall a. Monoid a => a
mempty (StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
-> Transaction (Map Hash32 (Entity Text Hash32 Hash32)))
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
-> Transaction (Map Hash32 (Entity Text Hash32 Hash32))
forall a b. (a -> b) -> a -> b
$ Hash32
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
expandEntities (CausalHash -> Hash32
causalHashToHash32 CausalHash
causalHash)
where
expandEntities :: Hash32 -> ((StateT (Map Hash32 (Sync.Entity Text Hash32 Hash32)) Sqlite.Transaction)) ()
expandEntities :: Hash32
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
expandEntities Hash32
hash32 = do
(Map Hash32 (Entity Text Hash32 Hash32) -> Bool)
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction Bool
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (Hash32 -> Map Hash32 (Entity Text Hash32 Hash32) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
Map.member Hash32
hash32) StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction Bool
-> (Bool
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ())
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
forall a b.
StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction a
-> (a
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction b)
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Bool
True -> ()
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
forall a.
a -> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Bool
False -> do
Entity Text Hash32 Hash32
entity <- Transaction (Entity Text Hash32 Hash32)
-> StateT
(Map Hash32 (Entity Text Hash32 Hash32))
Transaction
(Entity Text Hash32 Hash32)
forall (m :: * -> *) a.
Monad m =>
m a -> StateT (Map Hash32 (Entity Text Hash32 Hash32)) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Transaction (Entity Text Hash32 Hash32)
-> StateT
(Map Hash32 (Entity Text Hash32 Hash32))
Transaction
(Entity Text Hash32 Hash32))
-> Transaction (Entity Text Hash32 Hash32)
-> StateT
(Map Hash32 (Entity Text Hash32 Hash32))
Transaction
(Entity Text Hash32 Hash32)
forall a b. (a -> b) -> a -> b
$ Hash32 -> Transaction (Entity Text Hash32 Hash32)
Sync.expectEntity Hash32
hash32
(Map Hash32 (Entity Text Hash32 Hash32)
-> Map Hash32 (Entity Text Hash32 Hash32))
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify (Hash32
-> Entity Text Hash32 Hash32
-> Map Hash32 (Entity Text Hash32 Hash32)
-> Map Hash32 (Entity Text Hash32 Hash32)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Hash32
hash32 Entity Text Hash32 Hash32
entity)
Transaction ()
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
forall (m :: * -> *) a.
Monad m =>
m a -> StateT (Map Hash32 (Entity Text Hash32 Hash32)) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Transaction ()
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ())
-> (IO () -> Transaction ())
-> IO ()
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> Transaction ()
forall a. HasCallStack => IO a -> Transaction a
Sqlite.unsafeIO (IO ()
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ())
-> IO ()
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
counter Int
1
Getting
(Traversed
() (StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction))
(Entity Text Hash32 Hash32)
Hash32
-> (Hash32
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ())
-> Entity Text Hash32 Hash32
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
forall (f :: * -> *) r s a.
Functor f =>
Getting (Traversed r f) s a -> (a -> f r) -> s -> f ()
traverseOf_ Getting
(Traversed
() (StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction))
(Entity Text Hash32 Hash32)
Hash32
forall (m :: * -> *) hash' hash text noSyncHash.
(Applicative m, Ord hash') =>
(hash -> m hash')
-> Entity text noSyncHash hash -> m (Entity text noSyncHash hash')
Sync.entityHashes_ Hash32
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
expandEntities Entity Text Hash32 Hash32
entity
_unNetString :: ConduitT ByteString ByteString StreamM ()
_unNetString :: ConduitT ByteString ByteString (ExceptT SyncErr (ResourceT IO)) ()
_unNetString = do
ByteString
bs <- Parser ByteString ByteString
-> ConduitT
ByteString ByteString (ExceptT SyncErr (ResourceT IO)) ByteString
forall a (m :: * -> *) b o.
(AttoparsecInput a, MonadThrow m) =>
Parser a b -> ConduitT a o m b
C.sinkParser (Parser ByteString ByteString
-> ConduitT
ByteString ByteString (ExceptT SyncErr (ResourceT IO)) ByteString)
-> Parser ByteString ByteString
-> ConduitT
ByteString ByteString (ExceptT SyncErr (ResourceT IO)) ByteString
forall a b. (a -> b) -> a -> b
$ do
Int
len <- Parser Int
forall a. Integral a => Parser a
A8.decimal
Char
_ <- Char -> Parser Char
A8.char Char
':'
ByteString
bs <- Int -> Parser ByteString ByteString
A.take Int
len
Char
_ <- Char -> Parser Char
A8.char Char
','
pure ByteString
bs
ByteString
-> ConduitT
ByteString ByteString (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield ByteString
bs
_decodeFramedEntity :: ByteString -> StreamM SyncV2.DownloadEntitiesChunk
_decodeFramedEntity :: ByteString -> StreamM DownloadEntitiesChunk
_decodeFramedEntity ByteString
bs = do
case ByteString -> Either DeserialiseFailure DownloadEntitiesChunk
forall a. Serialise a => ByteString -> Either DeserialiseFailure a
CBOR.deserialiseOrFail (ByteString -> ByteString
BL.fromStrict ByteString
bs) of
Left DeserialiseFailure
err -> SyncErr -> StreamM DownloadEntitiesChunk
forall a. SyncErr -> ExceptT SyncErr (ResourceT IO) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> StreamM DownloadEntitiesChunk)
-> (SyncError -> SyncErr)
-> SyncError
-> StreamM DownloadEntitiesChunk
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (SyncError -> PullError) -> SyncError -> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError -> PullError
SyncV2.PullError'Sync (SyncError -> StreamM DownloadEntitiesChunk)
-> SyncError -> StreamM DownloadEntitiesChunk
forall a b. (a -> b) -> a -> b
$ DeserialiseFailure -> SyncError
SyncV2.SyncErrorDeserializationFailure DeserialiseFailure
err
Right DownloadEntitiesChunk
chunk -> DownloadEntitiesChunk -> StreamM DownloadEntitiesChunk
forall a. a -> ExceptT SyncErr (ResourceT IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure DownloadEntitiesChunk
chunk
decodeUnframedEntities :: forall a. (CBOR.Serialise a) => Stream ByteString a
decodeUnframedEntities :: forall a. Serialise a => Stream ByteString a
decodeUnframedEntities = (forall a.
ExceptT SyncErr (ST RealWorld) a
-> ExceptT SyncErr (ResourceT IO) a)
-> ConduitT ByteString a (ExceptT SyncErr (ST RealWorld)) ()
-> ConduitT ByteString a (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) (n :: * -> *) i o r.
Monad m =>
(forall a. m a -> n a) -> ConduitT i o m r -> ConduitT i o n r
C.transPipe ((ST RealWorld (Either SyncErr a)
-> ResourceT IO (Either SyncErr a))
-> ExceptT SyncErr (ST RealWorld) a
-> ExceptT SyncErr (ResourceT IO) a
forall (m :: * -> *) e a (n :: * -> *) e' b.
(m (Either e a) -> n (Either e' b))
-> ExceptT e m a -> ExceptT e' n b
mapExceptT (IO (Either SyncErr a) -> ResourceT IO (Either SyncErr a)
forall (m :: * -> *) a. Monad m => m a -> ResourceT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (IO (Either SyncErr a) -> ResourceT IO (Either SyncErr a))
-> (ST RealWorld (Either SyncErr a) -> IO (Either SyncErr a))
-> ST RealWorld (Either SyncErr a)
-> ResourceT IO (Either SyncErr a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ST RealWorld (Either SyncErr a) -> IO (Either SyncErr a)
forall a. ST RealWorld a -> IO a
stToIO)) (ConduitT ByteString a (ExceptT SyncErr (ST RealWorld)) ()
-> ConduitT ByteString a (ExceptT SyncErr (ResourceT IO)) ())
-> ConduitT ByteString a (ExceptT SyncErr (ST RealWorld)) ()
-> ConduitT ByteString a (ExceptT SyncErr (ResourceT IO)) ()
forall a b. (a -> b) -> a -> b
$ do
ConduitT
ByteString a (ExceptT SyncErr (ST RealWorld)) (Maybe ByteString)
forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
C.await ConduitT
ByteString a (ExceptT SyncErr (ST RealWorld)) (Maybe ByteString)
-> (Maybe ByteString
-> ConduitT ByteString a (ExceptT SyncErr (ST RealWorld)) ())
-> ConduitT ByteString a (ExceptT SyncErr (ST RealWorld)) ()
forall a b.
ConduitT ByteString a (ExceptT SyncErr (ST RealWorld)) a
-> (a -> ConduitT ByteString a (ExceptT SyncErr (ST RealWorld)) b)
-> ConduitT ByteString a (ExceptT SyncErr (ST RealWorld)) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe ByteString
Nothing -> () -> ConduitT ByteString a (ExceptT SyncErr (ST RealWorld)) ()
forall a.
a -> ConduitT ByteString a (ExceptT SyncErr (ST RealWorld)) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just ByteString
bs -> do
Maybe ByteString -> ST RealWorld (IDecode RealWorld a)
d <- ConduitT
ByteString
a
(ExceptT SyncErr (ST RealWorld))
(Maybe ByteString -> ST RealWorld (IDecode RealWorld a))
forall s.
ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a))
newDecoder
ByteString
-> (Maybe ByteString -> ST RealWorld (IDecode RealWorld a))
-> ConduitT ByteString a (ExceptT SyncErr (ST RealWorld)) ()
forall s.
ByteString
-> (Maybe ByteString -> ST s (IDecode s a))
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
loop ByteString
bs Maybe ByteString -> ST RealWorld (IDecode RealWorld a)
d
where
newDecoder :: ConduitT ByteString a (ExceptT SyncErr (ST s)) (Maybe ByteString -> ST s (CBOR.IDecode s a))
newDecoder :: forall s.
ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a))
newDecoder = do
(ExceptT SyncErr (ST s) (IDecode s a)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a)
forall (m :: * -> *) a. Monad m => m a -> ConduitT ByteString a m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ExceptT SyncErr (ST s) (IDecode s a)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a))
-> (ST s (IDecode s a) -> ExceptT SyncErr (ST s) (IDecode s a))
-> ST s (IDecode s a)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ST s (IDecode s a) -> ExceptT SyncErr (ST s) (IDecode s a)
forall (m :: * -> *) a. Monad m => m a -> ExceptT SyncErr m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift) ST s (IDecode s a)
forall a s. Serialise a => ST s (IDecode s a)
CBOR.deserialiseIncremental ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a)
-> (IDecode s a
-> ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a)))
-> ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a))
forall a b.
ConduitT ByteString a (ExceptT SyncErr (ST s)) a
-> (a -> ConduitT ByteString a (ExceptT SyncErr (ST s)) b)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
CBOR.Done ByteString
_ ByteOffset
_ a
_ -> SyncErr
-> ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a))
forall a.
SyncErr -> ConduitT ByteString a (ExceptT SyncErr (ST s)) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr
-> ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a)))
-> (SyncError -> SyncErr)
-> SyncError
-> ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (SyncError -> PullError) -> SyncError -> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError -> PullError
SyncV2.PullError'Sync (SyncError
-> ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a)))
-> SyncError
-> ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a))
forall a b. (a -> b) -> a -> b
$ Text -> SyncError
SyncV2.SyncErrorStreamFailure Text
"Invalid initial decoder"
CBOR.Fail ByteString
_ ByteOffset
_ DeserialiseFailure
err -> SyncErr
-> ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a))
forall a.
SyncErr -> ConduitT ByteString a (ExceptT SyncErr (ST s)) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr
-> ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a)))
-> (SyncError -> SyncErr)
-> SyncError
-> ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (SyncError -> PullError) -> SyncError -> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError -> PullError
SyncV2.PullError'Sync (SyncError
-> ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a)))
-> SyncError
-> ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a))
forall a b. (a -> b) -> a -> b
$ DeserialiseFailure -> SyncError
SyncV2.SyncErrorDeserializationFailure DeserialiseFailure
err
CBOR.Partial Maybe ByteString -> ST s (IDecode s a)
k -> (Maybe ByteString -> ST s (IDecode s a))
-> ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a))
forall a. a -> ConduitT ByteString a (ExceptT SyncErr (ST s)) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ByteString -> ST s (IDecode s a)
k
loop :: ByteString -> (Maybe ByteString -> ST s (CBOR.IDecode s a)) -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
loop :: forall s.
ByteString
-> (Maybe ByteString -> ST s (IDecode s a))
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
loop ByteString
bs Maybe ByteString -> ST s (IDecode s a)
k = do
(ExceptT SyncErr (ST s) (IDecode s a)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a)
forall (m :: * -> *) a. Monad m => m a -> ConduitT ByteString a m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ExceptT SyncErr (ST s) (IDecode s a)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a))
-> (ST s (IDecode s a) -> ExceptT SyncErr (ST s) (IDecode s a))
-> ST s (IDecode s a)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ST s (IDecode s a) -> ExceptT SyncErr (ST s) (IDecode s a)
forall (m :: * -> *) a. Monad m => m a -> ExceptT SyncErr m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift) (Maybe ByteString -> ST s (IDecode s a)
k (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
bs)) ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a)
-> (IDecode s a
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ())
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall a b.
ConduitT ByteString a (ExceptT SyncErr (ST s)) a
-> (a -> ConduitT ByteString a (ExceptT SyncErr (ST s)) b)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
CBOR.Fail ByteString
_ ByteOffset
_ DeserialiseFailure
err -> SyncErr -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall a.
SyncErr -> ConduitT ByteString a (ExceptT SyncErr (ST s)) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ())
-> (SyncError -> SyncErr)
-> SyncError
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (SyncError -> PullError) -> SyncError -> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError -> PullError
SyncV2.PullError'Sync (SyncError -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ())
-> SyncError -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall a b. (a -> b) -> a -> b
$ DeserialiseFailure -> SyncError
SyncV2.SyncErrorDeserializationFailure DeserialiseFailure
err
CBOR.Partial Maybe ByteString -> ST s (IDecode s a)
k' -> do
Maybe ByteString
nextBS <- ConduitT ByteString a (ExceptT SyncErr (ST s)) (Maybe ByteString)
forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
C.await
case Maybe ByteString
nextBS of
Maybe ByteString
Nothing -> do
(ExceptT SyncErr (ST s) (IDecode s a)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a)
forall (m :: * -> *) a. Monad m => m a -> ConduitT ByteString a m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ExceptT SyncErr (ST s) (IDecode s a)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a))
-> (ST s (IDecode s a) -> ExceptT SyncErr (ST s) (IDecode s a))
-> ST s (IDecode s a)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ST s (IDecode s a) -> ExceptT SyncErr (ST s) (IDecode s a)
forall (m :: * -> *) a. Monad m => m a -> ExceptT SyncErr m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift) (Maybe ByteString -> ST s (IDecode s a)
k' Maybe ByteString
forall a. Maybe a
Nothing) ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a)
-> (IDecode s a
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ())
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall a b.
ConduitT ByteString a (ExceptT SyncErr (ST s)) a
-> (a -> ConduitT ByteString a (ExceptT SyncErr (ST s)) b)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
CBOR.Done ByteString
_ ByteOffset
_ a
a -> a -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield a
a
CBOR.Fail ByteString
_ ByteOffset
_ DeserialiseFailure
err -> SyncErr -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall a.
SyncErr -> ConduitT ByteString a (ExceptT SyncErr (ST s)) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ())
-> (SyncError -> SyncErr)
-> SyncError
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (SyncError -> PullError) -> SyncError -> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError -> PullError
SyncV2.PullError'Sync (SyncError -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ())
-> SyncError -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall a b. (a -> b) -> a -> b
$ DeserialiseFailure -> SyncError
SyncV2.SyncErrorDeserializationFailure DeserialiseFailure
err
CBOR.Partial Maybe ByteString -> ST s (IDecode s a)
_ -> SyncErr -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall a.
SyncErr -> ConduitT ByteString a (ExceptT SyncErr (ST s)) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ())
-> (SyncError -> SyncErr)
-> SyncError
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (SyncError -> PullError) -> SyncError -> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError -> PullError
SyncV2.PullError'Sync (SyncError -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ())
-> SyncError -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall a b. (a -> b) -> a -> b
$ Text -> SyncError
SyncV2.SyncErrorStreamFailure Text
"Unexpected end of input"
Just ByteString
bs' ->
ByteString
-> (Maybe ByteString -> ST s (IDecode s a))
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall s.
ByteString
-> (Maybe ByteString -> ST s (IDecode s a))
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
loop ByteString
bs' Maybe ByteString -> ST s (IDecode s a)
k'
CBOR.Done ByteString
rem ByteOffset
_ a
a -> do
a -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield a
a
if ByteString -> Bool
BS.null ByteString
rem
then do
ConduitT ByteString a (ExceptT SyncErr (ST s)) (Maybe ByteString)
forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
C.await ConduitT ByteString a (ExceptT SyncErr (ST s)) (Maybe ByteString)
-> (Maybe ByteString
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ())
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall a b.
ConduitT ByteString a (ExceptT SyncErr (ST s)) a
-> (a -> ConduitT ByteString a (ExceptT SyncErr (ST s)) b)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe ByteString
Nothing -> () -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall a. a -> ConduitT ByteString a (ExceptT SyncErr (ST s)) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just ByteString
bs'' -> do
Maybe ByteString -> ST s (IDecode s a)
k <- ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a))
forall s.
ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a))
newDecoder
ByteString
-> (Maybe ByteString -> ST s (IDecode s a))
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall s.
ByteString
-> (Maybe ByteString -> ST s (IDecode s a))
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
loop ByteString
bs'' Maybe ByteString -> ST s (IDecode s a)
k
else do
Maybe ByteString -> ST s (IDecode s a)
k <- ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a))
forall s.
ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a))
newDecoder
ByteString
-> (Maybe ByteString -> ST s (IDecode s a))
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall s.
ByteString
-> (Maybe ByteString -> ST s (IDecode s a))
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
loop ByteString
rem Maybe ByteString -> ST s (IDecode s a)
k
type SyncAPI = ("ucm" Servant.:> "v2" Servant.:> "sync" Servant.:> SyncV2.API)
syncAPI :: Proxy SyncAPI
syncAPI :: Proxy SyncAPI
syncAPI = forall t. Proxy t
forall {k} (t :: k). Proxy t
Proxy @SyncAPI
downloadEntitiesStreamClientM :: SyncV2.DownloadEntitiesRequest -> Servant.ClientM (Servant.SourceT IO (CBORStream SyncV2.DownloadEntitiesChunk))
causalDependenciesStreamClientM :: SyncV2.CausalDependenciesRequest -> Servant.ClientM (Servant.SourceT IO (CBORStream SyncV2.CausalDependenciesChunk))
SyncV2.Routes
{ $sel:downloadEntitiesStream:Routes :: forall mode.
Routes mode
-> mode :- ("entities" :> ("download" :> DownloadEntitiesStream))
downloadEntitiesStream = AsClientT ClientM
:- ("entities" :> ("download" :> DownloadEntitiesStream))
DownloadEntitiesRequest
-> ClientM (SourceIO (CBORStream DownloadEntitiesChunk))
downloadEntitiesStreamClientM,
$sel:causalDependenciesStream:Routes :: forall mode.
Routes mode
-> mode
:- ("entities" :> ("dependencies" :> CausalDependenciesStream))
causalDependenciesStream = AsClientT ClientM
:- ("entities" :> ("dependencies" :> CausalDependenciesStream))
CausalDependenciesRequest
-> ClientM (SourceIO (CBORStream CausalDependenciesChunk))
causalDependenciesStreamClientM
} = Proxy SyncAPI -> Client ClientM SyncAPI
forall api.
HasClient ClientM api =>
Proxy api -> Client ClientM api
Servant.client Proxy SyncAPI
syncAPI
withConduit :: forall r chunk. (CBOR.Serialise chunk) => Servant.ClientEnv -> (Stream () chunk -> StreamM r) -> Servant.ClientM (Servant.SourceIO (CBORStream chunk)) -> StreamM r
withConduit :: forall r chunk.
Serialise chunk =>
ClientEnv
-> (Stream () chunk -> StreamM r)
-> ClientM (SourceIO (CBORStream chunk))
-> StreamM r
withConduit ClientEnv
clientEnv Stream () chunk -> StreamM r
callback ClientM (SourceIO (CBORStream chunk))
clientM = do
ResourceT IO (Either SyncErr r) -> StreamM r
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (ResourceT IO (Either SyncErr r) -> StreamM r)
-> ResourceT IO (Either SyncErr r) -> StreamM r
forall a b. (a -> b) -> a -> b
$ ((forall a. ResourceT IO a -> IO a) -> IO (Either SyncErr r))
-> ResourceT IO (Either SyncErr r)
forall b.
((forall a. ResourceT IO a -> IO a) -> IO b) -> ResourceT IO b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO \forall a. ResourceT IO a -> IO a
runInIO -> do
ClientM (SourceIO (CBORStream chunk))
-> ClientEnv
-> (Either ClientError (SourceIO (CBORStream chunk))
-> IO (Either SyncErr r))
-> IO (Either SyncErr r)
forall a b.
ClientM a -> ClientEnv -> (Either ClientError a -> IO b) -> IO b
Servant.withClientM ClientM (SourceIO (CBORStream chunk))
clientM ClientEnv
clientEnv ((Either ClientError (SourceIO (CBORStream chunk))
-> IO (Either SyncErr r))
-> IO (Either SyncErr r))
-> (Either ClientError (SourceIO (CBORStream chunk))
-> IO (Either SyncErr r))
-> IO (Either SyncErr r)
forall a b. (a -> b) -> a -> b
$ \case
Left ClientError
err -> Either SyncErr r -> IO (Either SyncErr r)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either SyncErr r -> IO (Either SyncErr r))
-> (CodeserverTransportError -> Either SyncErr r)
-> CodeserverTransportError
-> IO (Either SyncErr r)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncErr -> Either SyncErr r
forall a b. a -> Either a b
Left (SyncErr -> Either SyncErr r)
-> (CodeserverTransportError -> SyncErr)
-> CodeserverTransportError
-> Either SyncErr r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CodeserverTransportError -> SyncErr
forall e. CodeserverTransportError -> SyncError e
TransportError (CodeserverTransportError -> IO (Either SyncErr r))
-> CodeserverTransportError -> IO (Either SyncErr r)
forall a b. (a -> b) -> a -> b
$ (ClientEnv -> ClientError -> CodeserverTransportError
handleClientError ClientEnv
clientEnv ClientError
err)
Right SourceIO (CBORStream chunk)
sourceT -> do
ConduitT () (CBORStream chunk) (ExceptT SyncErr (ResourceT IO)) ()
conduit <- IO
(ConduitT
() (CBORStream chunk) (ExceptT SyncErr (ResourceT IO)) ())
-> IO
(ConduitT
() (CBORStream chunk) (ExceptT SyncErr (ResourceT IO)) ())
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO
(ConduitT
() (CBORStream chunk) (ExceptT SyncErr (ResourceT IO)) ())
-> IO
(ConduitT
() (CBORStream chunk) (ExceptT SyncErr (ResourceT IO)) ()))
-> IO
(ConduitT
() (CBORStream chunk) (ExceptT SyncErr (ResourceT IO)) ())
-> IO
(ConduitT
() (CBORStream chunk) (ExceptT SyncErr (ResourceT IO)) ())
forall a b. (a -> b) -> a -> b
$ SourceIO (CBORStream chunk)
-> IO
(ConduitT
() (CBORStream chunk) (ExceptT SyncErr (ResourceT IO)) ())
forall chunk a. FromSourceIO chunk a => SourceIO chunk -> IO a
Servant.fromSourceIO SourceIO (CBORStream chunk)
sourceT
(ResourceT IO (Either SyncErr r) -> IO (Either SyncErr r)
forall a. ResourceT IO a -> IO a
runInIO (ResourceT IO (Either SyncErr r) -> IO (Either SyncErr r))
-> (StreamM r -> ResourceT IO (Either SyncErr r))
-> StreamM r
-> IO (Either SyncErr r)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamM r -> ResourceT IO (Either SyncErr r)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (StreamM r -> IO (Either SyncErr r))
-> StreamM r -> IO (Either SyncErr r)
forall a b. (a -> b) -> a -> b
$ Stream () chunk -> StreamM r
callback (ConduitT () (CBORStream chunk) (ExceptT SyncErr (ResourceT IO)) ()
conduit ConduitT () (CBORStream chunk) (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT
(CBORStream chunk) chunk (ExceptT SyncErr (ResourceT IO)) ()
-> Stream () chunk
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| ConduitT
(CBORStream chunk) chunk (ExceptT SyncErr (ResourceT IO)) ()
forall a. Serialise a => Stream (CBORStream a) a
unpackCBORBytesStream))
unpackCBORBytesStream :: (CBOR.Serialise a) => Stream (CBORStream a) a
unpackCBORBytesStream :: forall a. Serialise a => Stream (CBORStream a) a
unpackCBORBytesStream =
(CBORStream a -> ByteString)
-> ConduitT
(CBORStream a) ByteString (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
C.map (ByteString -> ByteString
BL.toStrict (ByteString -> ByteString)
-> (CBORStream a -> ByteString) -> CBORStream a -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. Coercible a b => a -> b
forall a b. Coercible a b => a -> b
coerce @_ @BL.ByteString) ConduitT
(CBORStream a) ByteString (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT ByteString a (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT (CBORStream a) a (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| ConduitT ByteString a (ExceptT SyncErr (ResourceT IO)) ()
forall a. Serialise a => Stream ByteString a
decodeUnframedEntities
handleClientError :: Servant.ClientEnv -> Servant.ClientError -> CodeserverTransportError
handleClientError :: ClientEnv -> ClientError -> CodeserverTransportError
handleClientError ClientEnv
clientEnv ClientError
err =
case ClientError
err of
Servant.FailureResponse RequestF () (BaseUrl, ByteString)
_req Response
resp ->
case Status -> Int
HTTP.statusCode (Status -> Int) -> Status -> Int
forall a b. (a -> b) -> a -> b
$ Response -> Status
forall a. ResponseF a -> Status
Servant.responseStatusCode Response
resp of
Int
401 -> BaseUrl -> CodeserverTransportError
Unauthenticated (ClientEnv -> BaseUrl
Servant.baseUrl ClientEnv
clientEnv)
Int
403 -> Text -> CodeserverTransportError
PermissionDenied (Text -> Text
Text.Lazy.toStrict (Text -> Text) -> (ByteString -> Text) -> ByteString -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text
Text.Lazy.decodeUtf8 (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ Response -> ByteString
forall a. ResponseF a -> a
Servant.responseBody Response
resp)
Int
408 -> CodeserverTransportError
Timeout
Int
429 -> CodeserverTransportError
RateLimitExceeded
Int
504 -> CodeserverTransportError
Timeout
Int
_ -> Response -> CodeserverTransportError
UnexpectedResponse Response
resp
Servant.DecodeFailure Text
msg Response
resp -> Text -> Response -> CodeserverTransportError
DecodeFailure Text
msg Response
resp
Servant.UnsupportedContentType MediaType
_ct Response
resp -> Response -> CodeserverTransportError
UnexpectedResponse Response
resp
Servant.InvalidContentTypeHeader Response
resp -> Response -> CodeserverTransportError
UnexpectedResponse Response
resp
Servant.ConnectionError SomeException
_ -> BaseUrl -> CodeserverTransportError
UnreachableCodeserver (ClientEnv -> BaseUrl
Servant.baseUrl ClientEnv
clientEnv)
httpStreamEntities ::
Auth.AuthenticatedHttpClient ->
Servant.BaseUrl ->
SyncV2.DownloadEntitiesRequest ->
(SyncV2.StreamInitInfo -> Stream () SyncV2.EntityChunk -> StreamM ()) ->
StreamM ()
httpStreamEntities :: AuthenticatedHttpClient
-> BaseUrl
-> DownloadEntitiesRequest
-> (StreamInitInfo
-> Stream () EntityChunk -> ExceptT SyncErr (ResourceT IO) ())
-> ExceptT SyncErr (ResourceT IO) ()
httpStreamEntities (Auth.AuthenticatedHttpClient Manager
httpClient) BaseUrl
unisonShareUrl DownloadEntitiesRequest
req StreamInitInfo
-> Stream () EntityChunk -> ExceptT SyncErr (ResourceT IO) ()
callback = do
let clientEnv :: ClientEnv
clientEnv =
(Manager -> BaseUrl -> ClientEnv
Servant.mkClientEnv Manager
httpClient BaseUrl
unisonShareUrl)
{ Servant.makeClientRequest = \BaseUrl
url Request
request ->
(BaseUrl -> Request -> IO Request
Servant.defaultMakeClientRequest BaseUrl
url Request
request)
IO Request -> (Request -> Request) -> IO Request
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \Request
r ->
Request
r
{ Http.Client.responseTimeout = Http.Client.responseTimeoutNone
}
}
(DownloadEntitiesRequest
-> ClientM (SourceIO (CBORStream DownloadEntitiesChunk))
downloadEntitiesStreamClientM DownloadEntitiesRequest
req) ClientM (SourceIO (CBORStream DownloadEntitiesChunk))
-> (ClientM (SourceIO (CBORStream DownloadEntitiesChunk))
-> ExceptT SyncErr (ResourceT IO) ())
-> ExceptT SyncErr (ResourceT IO) ()
forall a b. a -> (a -> b) -> b
& ClientEnv
-> (Stream () DownloadEntitiesChunk
-> ExceptT SyncErr (ResourceT IO) ())
-> ClientM (SourceIO (CBORStream DownloadEntitiesChunk))
-> ExceptT SyncErr (ResourceT IO) ()
forall r chunk.
Serialise chunk =>
ClientEnv
-> (Stream () chunk -> StreamM r)
-> ClientM (SourceIO (CBORStream chunk))
-> StreamM r
withConduit ClientEnv
clientEnv \Stream () DownloadEntitiesChunk
stream -> do
(StreamInitInfo
init, Stream () EntityChunk
entityStream) <- Stream () DownloadEntitiesChunk
-> StreamM (StreamInitInfo, Stream () EntityChunk)
initializeStream Stream () DownloadEntitiesChunk
stream
StreamInitInfo
-> Stream () EntityChunk -> ExceptT SyncErr (ResourceT IO) ()
callback StreamInitInfo
init Stream () EntityChunk
entityStream
initializeStream :: Stream () SyncV2.DownloadEntitiesChunk -> StreamM (SyncV2.StreamInitInfo, Stream () SyncV2.EntityChunk)
initializeStream :: Stream () DownloadEntitiesChunk
-> StreamM (StreamInitInfo, Stream () EntityChunk)
initializeStream Stream () DownloadEntitiesChunk
stream = do
(SealedConduitT
() DownloadEntitiesChunk (ExceptT SyncErr (ResourceT IO)) ()
streamRemainder, Maybe DownloadEntitiesChunk
init) <- Stream () DownloadEntitiesChunk
stream Stream () DownloadEntitiesChunk
-> ConduitT
DownloadEntitiesChunk
Void
(ExceptT SyncErr (ResourceT IO))
(Maybe DownloadEntitiesChunk)
-> ExceptT
SyncErr
(ResourceT IO)
(SealedConduitT
() DownloadEntitiesChunk (ExceptT SyncErr (ResourceT IO)) (),
Maybe DownloadEntitiesChunk)
forall (m :: * -> *) a b.
Monad m =>
ConduitT () a m ()
-> ConduitT a Void m b -> m (SealedConduitT () a m (), b)
C.$$+ ConduitT
DownloadEntitiesChunk
Void
(ExceptT SyncErr (ResourceT IO))
(Maybe DownloadEntitiesChunk)
forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
C.headC
case Maybe DownloadEntitiesChunk
init of
Maybe DownloadEntitiesChunk
Nothing -> SyncErr -> StreamM (StreamInitInfo, Stream () EntityChunk)
forall a. SyncErr -> ExceptT SyncErr (ResourceT IO) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> StreamM (StreamInitInfo, Stream () EntityChunk))
-> (SyncError -> SyncErr)
-> SyncError
-> StreamM (StreamInitInfo, Stream () EntityChunk)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (SyncError -> PullError) -> SyncError -> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError -> PullError
SyncV2.PullError'Sync (SyncError -> StreamM (StreamInitInfo, Stream () EntityChunk))
-> SyncError -> StreamM (StreamInitInfo, Stream () EntityChunk)
forall a b. (a -> b) -> a -> b
$ SyncError
SyncV2.SyncErrorMissingInitialChunk
Just DownloadEntitiesChunk
chunk -> do
case DownloadEntitiesChunk
chunk of
SyncV2.InitialC StreamInitInfo
info -> do
let entityStream :: Stream () EntityChunk
entityStream = SealedConduitT
() DownloadEntitiesChunk (ExceptT SyncErr (ResourceT IO)) ()
-> Stream () DownloadEntitiesChunk
forall (m :: * -> *) i o r.
Monad m =>
SealedConduitT i o m r -> ConduitT i o m r
C.unsealConduitT SealedConduitT
() DownloadEntitiesChunk (ExceptT SyncErr (ResourceT IO)) ()
streamRemainder Stream () DownloadEntitiesChunk
-> ConduitT
DownloadEntitiesChunk
EntityChunk
(ExceptT SyncErr (ResourceT IO))
()
-> Stream () EntityChunk
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| (DownloadEntitiesChunk
-> ExceptT SyncErr (ResourceT IO) EntityChunk)
-> ConduitT
DownloadEntitiesChunk
EntityChunk
(ExceptT SyncErr (ResourceT IO))
()
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> ConduitT a b m ()
C.mapM DownloadEntitiesChunk -> ExceptT SyncErr (ResourceT IO) EntityChunk
parseEntity
(StreamInitInfo, Stream () EntityChunk)
-> StreamM (StreamInitInfo, Stream () EntityChunk)
forall a. a -> ExceptT SyncErr (ResourceT IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((StreamInitInfo, Stream () EntityChunk)
-> StreamM (StreamInitInfo, Stream () EntityChunk))
-> (StreamInitInfo, Stream () EntityChunk)
-> StreamM (StreamInitInfo, Stream () EntityChunk)
forall a b. (a -> b) -> a -> b
$ (StreamInitInfo
info, Stream () EntityChunk
entityStream)
SyncV2.EntityC EntityChunk
_ -> do
SyncErr -> StreamM (StreamInitInfo, Stream () EntityChunk)
forall a. SyncErr -> ExceptT SyncErr (ResourceT IO) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> StreamM (StreamInitInfo, Stream () EntityChunk))
-> (SyncError -> SyncErr)
-> SyncError
-> StreamM (StreamInitInfo, Stream () EntityChunk)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (SyncError -> PullError) -> SyncError -> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError -> PullError
SyncV2.PullError'Sync (SyncError -> StreamM (StreamInitInfo, Stream () EntityChunk))
-> SyncError -> StreamM (StreamInitInfo, Stream () EntityChunk)
forall a b. (a -> b) -> a -> b
$ SyncError
SyncV2.SyncErrorMissingInitialChunk
SyncV2.ErrorC (SyncV2.ErrorChunk DownloadEntitiesError
err) -> SyncErr -> StreamM (StreamInitInfo, Stream () EntityChunk)
forall a. SyncErr -> ExceptT SyncErr (ResourceT IO) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> StreamM (StreamInitInfo, Stream () EntityChunk))
-> (DownloadEntitiesError -> SyncErr)
-> DownloadEntitiesError
-> StreamM (StreamInitInfo, Stream () EntityChunk)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (DownloadEntitiesError -> PullError)
-> DownloadEntitiesError
-> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DownloadEntitiesError -> PullError
SyncV2.PullError'DownloadEntities (DownloadEntitiesError
-> StreamM (StreamInitInfo, Stream () EntityChunk))
-> DownloadEntitiesError
-> StreamM (StreamInitInfo, Stream () EntityChunk)
forall a b. (a -> b) -> a -> b
$ DownloadEntitiesError
err
where
parseEntity :: SyncV2.DownloadEntitiesChunk -> StreamM SyncV2.EntityChunk
parseEntity :: DownloadEntitiesChunk -> ExceptT SyncErr (ResourceT IO) EntityChunk
parseEntity = \case
SyncV2.EntityC EntityChunk
chunk -> EntityChunk -> ExceptT SyncErr (ResourceT IO) EntityChunk
forall a. a -> ExceptT SyncErr (ResourceT IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure EntityChunk
chunk
SyncV2.ErrorC (SyncV2.ErrorChunk DownloadEntitiesError
err) -> SyncErr -> ExceptT SyncErr (ResourceT IO) EntityChunk
forall a. SyncErr -> ExceptT SyncErr (ResourceT IO) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> ExceptT SyncErr (ResourceT IO) EntityChunk)
-> (PullError -> SyncErr)
-> PullError
-> ExceptT SyncErr (ResourceT IO) EntityChunk
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> ExceptT SyncErr (ResourceT IO) EntityChunk)
-> PullError -> ExceptT SyncErr (ResourceT IO) EntityChunk
forall a b. (a -> b) -> a -> b
$ DownloadEntitiesError -> PullError
SyncV2.PullError'DownloadEntities DownloadEntitiesError
err
SyncV2.InitialC {} -> SyncErr -> ExceptT SyncErr (ResourceT IO) EntityChunk
forall a. SyncErr -> ExceptT SyncErr (ResourceT IO) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> ExceptT SyncErr (ResourceT IO) EntityChunk)
-> (PullError -> SyncErr)
-> PullError
-> ExceptT SyncErr (ResourceT IO) EntityChunk
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> ExceptT SyncErr (ResourceT IO) EntityChunk)
-> PullError -> ExceptT SyncErr (ResourceT IO) EntityChunk
forall a b. (a -> b) -> a -> b
$ SyncError -> PullError
SyncV2.PullError'Sync SyncError
SyncV2.SyncErrorMisplacedInitialChunk
httpStreamCausalDependencies ::
forall r.
Auth.AuthenticatedHttpClient ->
Servant.BaseUrl ->
SyncV2.CausalDependenciesRequest ->
(Stream () SyncV2.CausalDependenciesChunk -> StreamM r) ->
StreamM r
httpStreamCausalDependencies :: forall r.
AuthenticatedHttpClient
-> BaseUrl
-> CausalDependenciesRequest
-> (Stream () CausalDependenciesChunk -> StreamM r)
-> StreamM r
httpStreamCausalDependencies (Auth.AuthenticatedHttpClient Manager
httpClient) BaseUrl
unisonShareUrl CausalDependenciesRequest
req Stream () CausalDependenciesChunk -> StreamM r
callback = do
let clientEnv :: ClientEnv
clientEnv =
(Manager -> BaseUrl -> ClientEnv
Servant.mkClientEnv Manager
httpClient BaseUrl
unisonShareUrl)
{ Servant.makeClientRequest = \BaseUrl
url Request
request ->
(BaseUrl -> Request -> IO Request
Servant.defaultMakeClientRequest BaseUrl
url Request
request)
IO Request -> (Request -> Request) -> IO Request
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \Request
r ->
Request
r
{ Http.Client.responseTimeout = Http.Client.responseTimeoutNone
}
}
(CausalDependenciesRequest
-> ClientM (SourceIO (CBORStream CausalDependenciesChunk))
causalDependenciesStreamClientM CausalDependenciesRequest
req) ClientM (SourceIO (CBORStream CausalDependenciesChunk))
-> (ClientM (SourceIO (CBORStream CausalDependenciesChunk))
-> StreamM r)
-> StreamM r
forall a b. a -> (a -> b) -> b
& ClientEnv
-> (Stream () CausalDependenciesChunk -> StreamM r)
-> ClientM (SourceIO (CBORStream CausalDependenciesChunk))
-> StreamM r
forall r chunk.
Serialise chunk =>
ClientEnv
-> (Stream () chunk -> StreamM r)
-> ClientM (SourceIO (CBORStream chunk))
-> StreamM r
withConduit ClientEnv
clientEnv Stream () CausalDependenciesChunk -> StreamM r
callback
negotiateKnownCausals ::
Servant.BaseUrl ->
SyncV2.BranchRef ->
Share.HashJWT ->
Cli (Either (SyncError SyncV2.PullError) (Set Hash32))
negotiateKnownCausals :: BaseUrl
-> BranchRef -> HashJWT -> Cli (Either SyncErr (Set Hash32))
negotiateKnownCausals BaseUrl
unisonShareUrl BranchRef
branchRef HashJWT
hashJwt = do
Cli.Env {AuthenticatedHttpClient
$sel:authHTTPClient:Env :: Env -> AuthenticatedHttpClient
authHTTPClient :: AuthenticatedHttpClient
authHTTPClient, Codebase IO Symbol Ann
$sel:codebase:Env :: Env -> Codebase IO Symbol Ann
codebase :: Codebase IO Symbol Ann
codebase} <- Cli Env
forall r (m :: * -> *). MonadReader r m => m r
ask
IO () -> Cli ()
forall a. IO a -> Cli a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Cli ()) -> IO () -> Cli ()
forall a b. (a -> b) -> a -> b
$ Handle -> Text -> IO ()
Text.hPutStrLn Handle
IO.stderr (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
" 🔎 Identifying missing entities..."
FilePath
-> Cli (Either SyncErr (Set Hash32))
-> Cli (Either SyncErr (Set Hash32))
forall (m :: * -> *) a. MonadIO m => FilePath -> m a -> m a
Timing.time FilePath
"Causal Negotiation" (Cli (Either SyncErr (Set Hash32))
-> Cli (Either SyncErr (Set Hash32)))
-> Cli (Either SyncErr (Set Hash32))
-> Cli (Either SyncErr (Set Hash32))
forall a b. (a -> b) -> a -> b
$ do
IO (Either SyncErr (Set Hash32))
-> Cli (Either SyncErr (Set Hash32))
forall a. IO a -> Cli a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either SyncErr (Set Hash32))
-> Cli (Either SyncErr (Set Hash32)))
-> (ExceptT SyncErr (ResourceT IO) (Set Hash32)
-> IO (Either SyncErr (Set Hash32)))
-> ExceptT SyncErr (ResourceT IO) (Set Hash32)
-> Cli (Either SyncErr (Set Hash32))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ResourceT IO (Either SyncErr (Set Hash32))
-> IO (Either SyncErr (Set Hash32))
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
C.runResourceT (ResourceT IO (Either SyncErr (Set Hash32))
-> IO (Either SyncErr (Set Hash32)))
-> (ExceptT SyncErr (ResourceT IO) (Set Hash32)
-> ResourceT IO (Either SyncErr (Set Hash32)))
-> ExceptT SyncErr (ResourceT IO) (Set Hash32)
-> IO (Either SyncErr (Set Hash32))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExceptT SyncErr (ResourceT IO) (Set Hash32)
-> ResourceT IO (Either SyncErr (Set Hash32))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT SyncErr (ResourceT IO) (Set Hash32)
-> Cli (Either SyncErr (Set Hash32)))
-> ExceptT SyncErr (ResourceT IO) (Set Hash32)
-> Cli (Either SyncErr (Set Hash32))
forall a b. (a -> b) -> a -> b
$ AuthenticatedHttpClient
-> BaseUrl
-> CausalDependenciesRequest
-> (Stream () CausalDependenciesChunk
-> ExceptT SyncErr (ResourceT IO) (Set Hash32))
-> ExceptT SyncErr (ResourceT IO) (Set Hash32)
forall r.
AuthenticatedHttpClient
-> BaseUrl
-> CausalDependenciesRequest
-> (Stream () CausalDependenciesChunk -> StreamM r)
-> StreamM r
httpStreamCausalDependencies
AuthenticatedHttpClient
authHTTPClient
BaseUrl
unisonShareUrl
SyncV2.CausalDependenciesRequest {BranchRef
branchRef :: BranchRef
$sel:branchRef:CausalDependenciesRequest :: BranchRef
branchRef, $sel:rootCausal:CausalDependenciesRequest :: HashJWT
rootCausal = HashJWT
hashJwt}
\Stream () CausalDependenciesChunk
stream -> do
[Hash32] -> Set Hash32
forall a. Ord a => [a] -> Set a
Set.fromList ([Hash32] -> Set Hash32)
-> ExceptT SyncErr (ResourceT IO) [Hash32]
-> ExceptT SyncErr (ResourceT IO) (Set Hash32)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ConduitT () Void (ExceptT SyncErr (ResourceT IO)) [Hash32]
-> ExceptT SyncErr (ResourceT IO) [Hash32]
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
C.runConduit (Stream () CausalDependenciesChunk
stream Stream () CausalDependenciesChunk
-> ConduitT
CausalDependenciesChunk
Void
(ExceptT SyncErr (ResourceT IO))
[Hash32]
-> ConduitT () Void (ExceptT SyncErr (ResourceT IO)) [Hash32]
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| (CausalDependenciesChunk -> (Hash32, DependencyType))
-> ConduitT
CausalDependenciesChunk
(Hash32, DependencyType)
(ExceptT SyncErr (ResourceT IO))
()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
C.map CausalDependenciesChunk -> (Hash32, DependencyType)
unpack ConduitT
CausalDependenciesChunk
(Hash32, DependencyType)
(ExceptT SyncErr (ResourceT IO))
()
-> ConduitT
(Hash32, DependencyType)
Void
(ExceptT SyncErr (ResourceT IO))
[Hash32]
-> ConduitT
CausalDependenciesChunk
Void
(ExceptT SyncErr (ResourceT IO))
[Hash32]
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| Codebase IO Symbol Ann -> Stream (Hash32, DependencyType) Hash32
forall v a.
Codebase IO v a -> Stream (Hash32, DependencyType) Hash32
findKnownDeps Codebase IO Symbol Ann
codebase Stream (Hash32, DependencyType) Hash32
-> ConduitT Hash32 Void (ExceptT SyncErr (ResourceT IO)) [Hash32]
-> ConduitT
(Hash32, DependencyType)
Void
(ExceptT SyncErr (ResourceT IO))
[Hash32]
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| ConduitT Hash32 Void (ExceptT SyncErr (ResourceT IO)) [Hash32]
forall (m :: * -> *) a o. Monad m => ConduitT a o m [a]
C.sinkList)
where
findKnownDeps :: Codebase.Codebase IO v a -> Stream (Hash32, DependencyType) Hash32
findKnownDeps :: forall v a.
Codebase IO v a -> Stream (Hash32, DependencyType) Hash32
findKnownDeps Codebase IO v a
codebase = do
ConduitT
(Hash32, DependencyType)
Hash32
(ExceptT SyncErr (ResourceT IO))
(Maybe (Hash32, DependencyType))
forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
C.await ConduitT
(Hash32, DependencyType)
Hash32
(ExceptT SyncErr (ResourceT IO))
(Maybe (Hash32, DependencyType))
-> (Maybe (Hash32, DependencyType)
-> Stream (Hash32, DependencyType) Hash32)
-> Stream (Hash32, DependencyType) Hash32
forall a b.
ConduitT
(Hash32, DependencyType) Hash32 (ExceptT SyncErr (ResourceT IO)) a
-> (a
-> ConduitT
(Hash32, DependencyType) Hash32 (ExceptT SyncErr (ResourceT IO)) b)
-> ConduitT
(Hash32, DependencyType) Hash32 (ExceptT SyncErr (ResourceT IO)) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just (Hash32
hash, DependencyType
LibDependency) -> do
ConduitT
(Hash32, DependencyType)
Hash32
(ExceptT SyncErr (ResourceT IO))
Bool
-> Stream (Hash32, DependencyType) Hash32
-> Stream (Hash32, DependencyType) Hash32
forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
whenM (ExceptT SyncErr (ResourceT IO) Bool
-> ConduitT
(Hash32, DependencyType)
Hash32
(ExceptT SyncErr (ResourceT IO))
Bool
forall (m :: * -> *) a.
Monad m =>
m a -> ConduitT (Hash32, DependencyType) Hash32 m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ExceptT SyncErr (ResourceT IO) Bool
-> ConduitT
(Hash32, DependencyType)
Hash32
(ExceptT SyncErr (ResourceT IO))
Bool)
-> ExceptT SyncErr (ResourceT IO) Bool
-> ConduitT
(Hash32, DependencyType)
Hash32
(ExceptT SyncErr (ResourceT IO))
Bool
forall a b. (a -> b) -> a -> b
$ Codebase IO v a -> Hash32 -> ExceptT SyncErr (ResourceT IO) Bool
forall v a.
Codebase IO v a -> Hash32 -> ExceptT SyncErr (ResourceT IO) Bool
haveCausalHash Codebase IO v a
codebase Hash32
hash) (Hash32 -> Stream (Hash32, DependencyType) Hash32
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield Hash32
hash)
Codebase IO v a -> Stream (Hash32, DependencyType) Hash32
forall v a.
Codebase IO v a -> Stream (Hash32, DependencyType) Hash32
findKnownDeps Codebase IO v a
codebase
Just (Hash32
hash, DependencyType
CausalSpineDependency) -> do
ExceptT SyncErr (ResourceT IO) Bool
-> ConduitT
(Hash32, DependencyType)
Hash32
(ExceptT SyncErr (ResourceT IO))
Bool
forall (m :: * -> *) a.
Monad m =>
m a -> ConduitT (Hash32, DependencyType) Hash32 m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Codebase IO v a -> Hash32 -> ExceptT SyncErr (ResourceT IO) Bool
forall v a.
Codebase IO v a -> Hash32 -> ExceptT SyncErr (ResourceT IO) Bool
haveCausalHash Codebase IO v a
codebase Hash32
hash) ConduitT
(Hash32, DependencyType)
Hash32
(ExceptT SyncErr (ResourceT IO))
Bool
-> (Bool -> Stream (Hash32, DependencyType) Hash32)
-> Stream (Hash32, DependencyType) Hash32
forall a b.
ConduitT
(Hash32, DependencyType) Hash32 (ExceptT SyncErr (ResourceT IO)) a
-> (a
-> ConduitT
(Hash32, DependencyType) Hash32 (ExceptT SyncErr (ResourceT IO)) b)
-> ConduitT
(Hash32, DependencyType) Hash32 (ExceptT SyncErr (ResourceT IO)) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Bool
True -> do
Hash32 -> Stream (Hash32, DependencyType) Hash32
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield Hash32
hash
Bool
False -> do
Codebase IO v a -> Stream (Hash32, DependencyType) Hash32
forall v a.
Codebase IO v a -> Stream (Hash32, DependencyType) Hash32
findKnownDeps Codebase IO v a
codebase
Maybe (Hash32, DependencyType)
Nothing -> () -> Stream (Hash32, DependencyType) Hash32
forall a.
a
-> ConduitT
(Hash32, DependencyType) Hash32 (ExceptT SyncErr (ResourceT IO)) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
unpack :: SyncV2.CausalDependenciesChunk -> (Hash32, DependencyType)
unpack :: CausalDependenciesChunk -> (Hash32, DependencyType)
unpack = \case
SyncV2.CausalHashDepC {Hash32
causalHash :: Hash32
$sel:causalHash:CausalHashDepC :: CausalDependenciesChunk -> Hash32
causalHash, DependencyType
dependencyType :: DependencyType
$sel:dependencyType:CausalHashDepC :: CausalDependenciesChunk -> DependencyType
dependencyType} -> (Hash32
causalHash, DependencyType
dependencyType)
haveCausalHash :: Codebase.Codebase IO v a -> Hash32 -> StreamM Bool
haveCausalHash :: forall v a.
Codebase IO v a -> Hash32 -> ExceptT SyncErr (ResourceT IO) Bool
haveCausalHash Codebase IO v a
codebase Hash32
causalHash = do
IO Bool -> ExceptT SyncErr (ResourceT IO) Bool
forall a. IO a -> ExceptT SyncErr (ResourceT IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> ExceptT SyncErr (ResourceT IO) Bool)
-> IO Bool -> ExceptT SyncErr (ResourceT IO) Bool
forall a b. (a -> b) -> a -> b
$ Codebase IO v a -> Transaction Bool -> IO Bool
forall (m :: * -> *) v a b.
MonadIO m =>
Codebase m v a -> Transaction b -> m b
Codebase.runTransaction Codebase IO v a
codebase do
Hash32 -> Transaction Bool
Q.causalExistsByHash32 Hash32
causalHash
counterProgress :: (MonadIO m, MonadUnliftIO n) => (Int -> Text) -> ((Int -> m ()) -> n a) -> n a
counterProgress :: forall (m :: * -> *) (n :: * -> *) a.
(MonadIO m, MonadUnliftIO n) =>
(Int -> Text) -> ((Int -> m ()) -> n a) -> n a
counterProgress Int -> Text
msgBuilder (Int -> m ()) -> n a
action = do
TVar Int
counterVar <- Int -> n (TVar Int)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
IO.newTVarIO (Int
0 :: Int)
((forall a. n a -> IO a) -> IO a) -> n a
forall b. ((forall a. n a -> IO a) -> IO b) -> n b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
IO.withRunInIO \forall a. n a -> IO a
toIO -> do
IO a -> IO a
forall (m :: * -> *) a. (MonadIO m, MonadMask m) => m a -> m a
Console.Regions.displayConsoleRegions do
RegionLayout -> (ConsoleRegion -> IO a) -> IO a
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
RegionLayout -> (ConsoleRegion -> m a) -> m a
Console.Regions.withConsoleRegion RegionLayout
Console.Regions.Linear \ConsoleRegion
region -> do
ConsoleRegion -> STM Text -> IO ()
forall v (m :: * -> *).
(ToRegionContent v, LiftRegion m) =>
ConsoleRegion -> v -> m ()
Console.Regions.setConsoleRegion ConsoleRegion
region do
Int
num <- TVar Int -> STM Int
forall a. TVar a -> STM a
IO.readTVar TVar Int
counterVar
pure $ Int -> Text
msgBuilder Int
num
n a -> IO a
forall a. n a -> IO a
toIO (n a -> IO a) -> n a -> IO a
forall a b. (a -> b) -> a -> b
$ (Int -> m ()) -> n a
action ((Int -> m ()) -> n a) -> (Int -> m ()) -> n a
forall a b. (a -> b) -> a -> b
$ \Int
i -> do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
IO.atomically (TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
IO.modifyTVar' TVar Int
counterVar (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
i))
withStreamProgressCallback :: (MonadIO m, MonadUnliftIO n) => Maybe Int -> (ConduitT i i m () -> n a) -> n a
withStreamProgressCallback :: forall (m :: * -> *) (n :: * -> *) i a.
(MonadIO m, MonadUnliftIO n) =>
Maybe Int -> (ConduitT i i m () -> n a) -> n a
withStreamProgressCallback Maybe Int
total ConduitT i i m () -> n a
action = do
let msg :: Int -> Text
msg Int
n = Text
"\n 📦 Unpacked " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Int -> Text
forall a. Show a => a -> Text
tShow Int
n Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> (Int -> Text) -> Maybe Int -> Text
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Text
"" (\Int
total -> Text
" / " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Int -> Text
forall a. Show a => a -> Text
tShow Int
total) Maybe Int
total Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" entities...\n\n"
let action' :: (Int -> m ()) -> n a
action' Int -> m ()
f = ConduitT i i m () -> n a
action ((i -> m ()) -> ConduitT i i m ()
forall (m :: * -> *) a. Monad m => (a -> m ()) -> ConduitT a a m ()
C.iterM \i
_i -> Int -> m ()
f Int
1)
(Int -> Text) -> ((Int -> m ()) -> n a) -> n a
forall (m :: * -> *) (n :: * -> *) a.
(MonadIO m, MonadUnliftIO n) =>
(Int -> Text) -> ((Int -> m ()) -> n a) -> n a
counterProgress Int -> Text
msg (Int -> m ()) -> n a
action'
withEntitySavingCallback :: (MonadUnliftIO m) => Maybe Int -> ((Int -> m ()) -> m a) -> m a
withEntitySavingCallback :: forall (m :: * -> *) a.
MonadUnliftIO m =>
Maybe Int -> ((Int -> m ()) -> m a) -> m a
withEntitySavingCallback Maybe Int
total (Int -> m ()) -> m a
action = do
let msg :: Int -> Text
msg Int
n = Text
"\n 💾 Saved " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Int -> Text
forall a. Show a => a -> Text
tShow Int
n Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> (Int -> Text) -> Maybe Int -> Text
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Text
"" (\Int
total -> Text
" / " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Int -> Text
forall a. Show a => a -> Text
tShow Int
total) Maybe Int
total Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" new entities...\n\n"
(Int -> Text) -> ((Int -> m ()) -> m a) -> m a
forall (m :: * -> *) (n :: * -> *) a.
(MonadIO m, MonadUnliftIO n) =>
(Int -> Text) -> ((Int -> m ()) -> n a) -> n a
counterProgress Int -> Text
msg (Int -> m ()) -> m a
action
withEntityLoadingCallback :: (MonadUnliftIO m) => ((Int -> m ()) -> m a) -> m a
withEntityLoadingCallback :: forall (m :: * -> *) a.
MonadUnliftIO m =>
((Int -> m ()) -> m a) -> m a
withEntityLoadingCallback (Int -> m ()) -> m a
action = do
let msg :: a -> Text
msg a
n = Text
"\n 📦 Unpacked " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> a -> Text
forall a. Show a => a -> Text
tShow a
n Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" entities...\n\n"
(Int -> Text) -> ((Int -> m ()) -> m a) -> m a
forall (m :: * -> *) (n :: * -> *) a.
(MonadIO m, MonadUnliftIO n) =>
(Int -> Text) -> ((Int -> m ()) -> n a) -> n a
counterProgress Int -> Text
forall a. Show a => a -> Text
msg (Int -> m ()) -> m a
action