{-# LANGUAGE ApplicativeDo #-}
module Unison.Share.SyncV2
( syncFromFile,
syncToFile,
syncFromCodebase,
syncFromCodeserver,
)
where
import Codec.Serialise qualified as CBOR
import Conduit (ConduitT)
import Conduit qualified as C
import Control.Concurrent.STM.TBMQueue qualified as STM
import Control.Lens
import Control.Monad.Except
import Control.Monad.Reader (ask)
import Control.Monad.ST (ST, stToIO)
import Control.Monad.State
import Data.Attoparsec.ByteString qualified as A
import Data.Attoparsec.ByteString.Char8 qualified as A8
import Data.ByteString qualified as BS
import Data.ByteString.Lazy qualified as BL
import Data.Conduit.Attoparsec qualified as C
import Data.Conduit.Combinators qualified as C
import Data.Conduit.List qualified as CL
import Data.Conduit.Zlib qualified as C
import Data.Foldable qualified as Foldable
import Data.Graph qualified as Graph
import Data.Map qualified as Map
import Data.Proxy
import Data.Set qualified as Set
import Data.Text qualified as Text
import Data.Text.Lazy qualified as Text.Lazy
import Data.Text.Lazy.Encoding qualified as Text.Lazy
import Data.Vector (Vector)
import Data.Vector qualified as Vector
import Network.HTTP.Client qualified as Http.Client
import Network.HTTP.Types qualified as HTTP
import Servant.API qualified as Servant
import Servant.Client.Streaming qualified as Servant
import Servant.Conduit ()
import Servant.Types.SourceT qualified as Servant
import System.Console.Regions qualified as Console.Regions
import U.Codebase.HashTags (CausalHash)
import U.Codebase.Sqlite.DbId (CausalHashId)
import U.Codebase.Sqlite.Queries qualified as Q
import U.Codebase.Sqlite.TempEntity (TempEntity)
import U.Codebase.Sqlite.V2.HashHandle (v2HashHandle)
import Unison.Auth.HTTPClient qualified as Auth
import Unison.Cli.Monad (Cli)
import Unison.Cli.Monad qualified as Cli
import Unison.Codebase qualified as Codebase
import Unison.Hash32 (Hash32)
import Unison.Prelude
import Unison.Share.API.Hash qualified as Share
import Unison.Share.ExpectedHashMismatches (expectedCausalHashMismatches, expectedComponentHashMismatches)
import Unison.Share.Sync.Types
import Unison.Sqlite qualified as Sqlite
import Unison.Sync.Common (causalHashToHash32, hash32ToCausalHash, tempEntityToEntity)
import Unison.Sync.Common qualified as Sync
import Unison.Sync.EntityValidation qualified as EV
import Unison.Sync.Types qualified as Share
import Unison.Sync.Types qualified as Sync
import Unison.SyncV2.API (Routes (downloadEntitiesStream))
import Unison.SyncV2.API qualified as SyncV2
import Unison.SyncV2.Types (CBORBytes, CBORStream, DependencyType (..))
import Unison.SyncV2.Types qualified as SyncV2
import Unison.Util.Monoid qualified as Monoid
import Unison.Util.Servant.CBOR qualified as CBOR
import Unison.Util.Timing qualified as Timing
import UnliftIO qualified as IO
import UnliftIO.Async qualified as Async
import UnliftIO.STM qualified as STM
type Stream i o = ConduitT i o StreamM ()
type SyncErr = SyncError SyncV2.PullError
type StreamM = (ExceptT SyncErr (C.ResourceT IO))
data ProgressCallbacks
= ProgressCallbacks
{ ProgressCallbacks -> Int -> IO ()
setTotal :: Int -> IO (),
ProgressCallbacks -> Int -> IO ()
downloadCounter :: Int -> IO (),
ProgressCallbacks -> IO ()
doneDownloading :: IO (),
ProgressCallbacks -> Int -> IO ()
importCounter :: Int -> IO ()
}
syncToFile ::
Codebase.Codebase IO v a ->
CausalHash ->
Maybe SyncV2.BranchRef ->
FilePath ->
IO (Either SyncErr ())
syncToFile :: forall v a.
Codebase IO v a
-> CausalHash
-> Maybe BranchRef
-> FilePath
-> IO (Either SyncErr ())
syncToFile Codebase IO v a
codebase CausalHash
rootHash Maybe BranchRef
mayBranchRef FilePath
destFilePath = do
IO (Either SyncErr ()) -> IO (Either SyncErr ())
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either SyncErr ()) -> IO (Either SyncErr ()))
-> IO (Either SyncErr ()) -> IO (Either SyncErr ())
forall a b. (a -> b) -> a -> b
$ Codebase IO v a -> forall x. (Connection -> IO x) -> IO x
forall (m :: * -> *) v a.
Codebase m v a -> forall x. (Connection -> m x) -> m x
Codebase.withConnection Codebase IO v a
codebase \Connection
conn -> do
ResourceT IO (Either SyncErr ()) -> IO (Either SyncErr ())
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
C.runResourceT (ResourceT IO (Either SyncErr ()) -> IO (Either SyncErr ()))
-> ResourceT IO (Either SyncErr ()) -> IO (Either SyncErr ())
forall a b. (a -> b) -> a -> b
$
Connection
-> CausalHash
-> Maybe BranchRef
-> (Int
-> Stream () DownloadEntitiesChunk
-> ResourceT IO (Either SyncErr ()))
-> ResourceT IO (Either SyncErr ())
forall (m :: * -> *) r.
MonadIO m =>
Connection
-> CausalHash
-> Maybe BranchRef
-> (Int -> Stream () DownloadEntitiesChunk -> m r)
-> m r
withCodebaseEntityStream Connection
conn CausalHash
rootHash Maybe BranchRef
mayBranchRef \Int
mayTotal Stream () DownloadEntitiesChunk
stream -> do
Maybe Int
-> (ConduitT
DownloadEntitiesChunk
DownloadEntitiesChunk
(ExceptT SyncErr (ResourceT IO))
()
-> ResourceT IO (Either SyncErr ()))
-> ResourceT IO (Either SyncErr ())
forall (m :: * -> *) (n :: * -> *) i a.
(MonadIO m, MonadUnliftIO n) =>
Maybe Int -> (ConduitT i i m () -> n a) -> n a
syncToFileProgress (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
mayTotal) \ConduitT
DownloadEntitiesChunk
DownloadEntitiesChunk
(ExceptT SyncErr (ResourceT IO))
()
countC -> ExceptT SyncErr (ResourceT IO) ()
-> ResourceT IO (Either SyncErr ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT do
ConduitT () Void (ExceptT SyncErr (ResourceT IO)) ()
-> ExceptT SyncErr (ResourceT IO) ()
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
C.runConduit (ConduitT () Void (ExceptT SyncErr (ResourceT IO)) ()
-> ExceptT SyncErr (ResourceT IO) ())
-> ConduitT () Void (ExceptT SyncErr (ResourceT IO)) ()
-> ExceptT SyncErr (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$
Stream () DownloadEntitiesChunk
stream
Stream () DownloadEntitiesChunk
-> ConduitT
DownloadEntitiesChunk Void (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT () Void (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| ConduitT
DownloadEntitiesChunk
DownloadEntitiesChunk
(ExceptT SyncErr (ResourceT IO))
()
countC
ConduitT
DownloadEntitiesChunk
DownloadEntitiesChunk
(ExceptT SyncErr (ResourceT IO))
()
-> ConduitT
DownloadEntitiesChunk Void (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT
DownloadEntitiesChunk Void (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| (DownloadEntitiesChunk -> ByteString)
-> ConduitT
DownloadEntitiesChunk
ByteString
(ExceptT SyncErr (ResourceT IO))
()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
C.map (ByteString -> ByteString
BL.toStrict (ByteString -> ByteString)
-> (DownloadEntitiesChunk -> ByteString)
-> DownloadEntitiesChunk
-> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DownloadEntitiesChunk -> ByteString
forall a. Serialise a => a -> ByteString
CBOR.serialise)
ConduitT
DownloadEntitiesChunk
ByteString
(ExceptT SyncErr (ResourceT IO))
()
-> ConduitT ByteString Void (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT
DownloadEntitiesChunk Void (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| (forall a. IO a -> ExceptT SyncErr (ResourceT IO) a)
-> ConduitT ByteString ByteString IO ()
-> ConduitT
ByteString ByteString (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) (n :: * -> *) i o r.
Monad m =>
(forall a. m a -> n a) -> ConduitT i o m r -> ConduitT i o n r
C.transPipe IO a -> ExceptT SyncErr (ResourceT IO) a
forall a. IO a -> ExceptT SyncErr (ResourceT IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO ConduitT ByteString ByteString IO ()
forall (m :: * -> *).
(MonadThrow m, PrimMonad m) =>
ConduitT ByteString ByteString m ()
C.gzip
ConduitT ByteString ByteString (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT ByteString Void (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT ByteString Void (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| FilePath
-> ConduitT ByteString Void (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) o.
MonadResource m =>
FilePath -> ConduitT ByteString o m ()
C.sinkFile FilePath
destFilePath
syncFromFile ::
Bool ->
FilePath ->
Cli (Either (SyncError SyncV2.PullError) (CausalHash, CausalHashId))
syncFromFile :: Bool -> FilePath -> Cli (Either SyncErr (CausalHash, CausalHashId))
syncFromFile Bool
shouldValidate FilePath
syncFilePath = do
Cli.Env {Codebase IO Symbol Ann
codebase :: Codebase IO Symbol Ann
$sel:codebase:Env :: Env -> Codebase IO Symbol Ann
codebase} <- Cli Env
forall r (m :: * -> *). MonadReader r m => m r
ask
Transaction () -> Cli ()
forall a. Transaction a -> Cli a
Cli.runTransaction Transaction ()
Q.clearTempEntityTables
IO (Either SyncErr (CausalHash, CausalHashId))
-> Cli (Either SyncErr (CausalHash, CausalHashId))
forall a. IO a -> Cli a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either SyncErr (CausalHash, CausalHashId))
-> Cli (Either SyncErr (CausalHash, CausalHashId)))
-> IO (Either SyncErr (CausalHash, CausalHashId))
-> Cli (Either SyncErr (CausalHash, CausalHashId))
forall a b. (a -> b) -> a -> b
$ Bool
-> (ProgressCallbacks
-> IO (Either SyncErr (CausalHash, CausalHashId)))
-> IO (Either SyncErr (CausalHash, CausalHashId))
forall (n :: * -> *) a.
MonadUnliftIO n =>
Bool -> (ProgressCallbacks -> n a) -> n a
withStreamProgress Bool
False \ProgressCallbacks
progressCounters -> do
ExceptT SyncErr IO (CausalHash, CausalHashId)
-> IO (Either SyncErr (CausalHash, CausalHashId))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT do
(IO (Either SyncErr (CausalHash, CausalHashId))
-> IO (Either SyncErr (CausalHash, CausalHashId)))
-> ExceptT SyncErr IO (CausalHash, CausalHashId)
-> ExceptT SyncErr IO (CausalHash, CausalHashId)
forall (m :: * -> *) e a (n :: * -> *) e' b.
(m (Either e a) -> n (Either e' b))
-> ExceptT e m a -> ExceptT e' n b
mapExceptT IO (Either SyncErr (CausalHash, CausalHashId))
-> IO (Either SyncErr (CausalHash, CausalHashId))
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (ExceptT SyncErr IO (CausalHash, CausalHashId)
-> ExceptT SyncErr IO (CausalHash, CausalHashId))
-> ExceptT SyncErr IO (CausalHash, CausalHashId)
-> ExceptT SyncErr IO (CausalHash, CausalHashId)
forall a b. (a -> b) -> a -> b
$ FilePath
-> ExceptT SyncErr IO (CausalHash, CausalHashId)
-> ExceptT SyncErr IO (CausalHash, CausalHashId)
forall (m :: * -> *) a. MonadIO m => FilePath -> m a -> m a
Timing.time FilePath
"File Sync" (ExceptT SyncErr IO (CausalHash, CausalHashId)
-> ExceptT SyncErr IO (CausalHash, CausalHashId))
-> ExceptT SyncErr IO (CausalHash, CausalHashId)
-> ExceptT SyncErr IO (CausalHash, CausalHashId)
forall a b. (a -> b) -> a -> b
$ do
StreamInitInfo
header <- (ResourceT IO (Either SyncErr StreamInitInfo)
-> IO (Either SyncErr StreamInitInfo))
-> ExceptT SyncErr (ResourceT IO) StreamInitInfo
-> ExceptT SyncErr IO StreamInitInfo
forall (m :: * -> *) e a (n :: * -> *) e' b.
(m (Either e a) -> n (Either e' b))
-> ExceptT e m a -> ExceptT e' n b
mapExceptT ResourceT IO (Either SyncErr StreamInitInfo)
-> IO (Either SyncErr StreamInitInfo)
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
C.runResourceT (ExceptT SyncErr (ResourceT IO) StreamInitInfo
-> ExceptT SyncErr IO StreamInitInfo)
-> ExceptT SyncErr (ResourceT IO) StreamInitInfo
-> ExceptT SyncErr IO StreamInitInfo
forall a b. (a -> b) -> a -> b
$ do
let stream :: Stream () DownloadEntitiesChunk
stream = FilePath
-> ConduitT () ByteString (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) i.
MonadResource m =>
FilePath -> ConduitT i ByteString m ()
C.sourceFile FilePath
syncFilePath ConduitT () ByteString (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT
ByteString
DownloadEntitiesChunk
(ExceptT SyncErr (ResourceT IO))
()
-> Stream () DownloadEntitiesChunk
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| ConduitT ByteString ByteString (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *).
(PrimMonad m, MonadThrow m) =>
ConduitT ByteString ByteString m ()
C.ungzip ConduitT ByteString ByteString (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT
ByteString
DownloadEntitiesChunk
(ExceptT SyncErr (ResourceT IO))
()
-> ConduitT
ByteString
DownloadEntitiesChunk
(ExceptT SyncErr (ResourceT IO))
()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| ConduitT
ByteString
DownloadEntitiesChunk
(ExceptT SyncErr (ResourceT IO))
()
forall a. Serialise a => Stream ByteString a
decodeUnframedEntities
(StreamInitInfo
header, Stream () EntityChunk
rest) <- (Int -> IO ())
-> Stream () DownloadEntitiesChunk
-> StreamM (StreamInitInfo, Stream () EntityChunk)
initializeStream (ProgressCallbacks -> Int -> IO ()
setTotal ProgressCallbacks
progressCounters) Stream () DownloadEntitiesChunk
stream
ProgressCallbacks
-> Bool
-> Codebase IO Symbol Ann
-> StreamInitInfo
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
forall v a.
ProgressCallbacks
-> Bool
-> Codebase IO v a
-> StreamInitInfo
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
streamIntoCodebase ProgressCallbacks
progressCounters Bool
shouldValidate Codebase IO Symbol Ann
codebase StreamInitInfo
header Stream () EntityChunk
rest
pure StreamInitInfo
header
CausalHashId
chId <- Codebase IO Symbol Ann -> Hash32 -> ExceptT SyncErr IO CausalHashId
forall v a.
Codebase IO v a -> Hash32 -> ExceptT SyncErr IO CausalHashId
afterSyncChecks Codebase IO Symbol Ann
codebase (StreamInitInfo -> Hash32
SyncV2.rootCausalHash StreamInitInfo
header)
pure (Hash32 -> CausalHash
hash32ToCausalHash (Hash32 -> CausalHash) -> Hash32 -> CausalHash
forall a b. (a -> b) -> a -> b
$ StreamInitInfo -> Hash32
SyncV2.rootCausalHash StreamInitInfo
header, CausalHashId
chId)
syncFromCodebase ::
Bool ->
Sqlite.Connection ->
(Codebase.Codebase IO v a) ->
CausalHash ->
IO (Either (SyncError SyncV2.PullError) (CausalHash, CausalHashId))
syncFromCodebase :: forall v a.
Bool
-> Connection
-> Codebase IO v a
-> CausalHash
-> IO (Either SyncErr (CausalHash, CausalHashId))
syncFromCodebase Bool
shouldValidate Connection
srcConn Codebase IO v a
destCodebase CausalHash
causalHash = do
Connection -> Transaction () -> IO ()
forall (m :: * -> *) a.
(MonadIO m, HasCallStack) =>
Connection -> Transaction a -> m a
Sqlite.runTransaction Connection
srcConn Transaction ()
Q.clearTempEntityTables
Bool
-> (ProgressCallbacks
-> IO (Either SyncErr (CausalHash, CausalHashId)))
-> IO (Either SyncErr (CausalHash, CausalHashId))
forall (n :: * -> *) a.
MonadUnliftIO n =>
Bool -> (ProgressCallbacks -> n a) -> n a
withStreamProgress Bool
False \ProgressCallbacks
progressCounters -> do
IO (Either SyncErr (CausalHash, CausalHashId))
-> IO (Either SyncErr (CausalHash, CausalHashId))
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either SyncErr (CausalHash, CausalHashId))
-> IO (Either SyncErr (CausalHash, CausalHashId)))
-> (ExceptT SyncErr (ResourceT IO) (CausalHash, CausalHashId)
-> IO (Either SyncErr (CausalHash, CausalHashId)))
-> ExceptT SyncErr (ResourceT IO) (CausalHash, CausalHashId)
-> IO (Either SyncErr (CausalHash, CausalHashId))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ResourceT IO (Either SyncErr (CausalHash, CausalHashId))
-> IO (Either SyncErr (CausalHash, CausalHashId))
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
C.runResourceT (ResourceT IO (Either SyncErr (CausalHash, CausalHashId))
-> IO (Either SyncErr (CausalHash, CausalHashId)))
-> (ExceptT SyncErr (ResourceT IO) (CausalHash, CausalHashId)
-> ResourceT IO (Either SyncErr (CausalHash, CausalHashId)))
-> ExceptT SyncErr (ResourceT IO) (CausalHash, CausalHashId)
-> IO (Either SyncErr (CausalHash, CausalHashId))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExceptT SyncErr (ResourceT IO) (CausalHash, CausalHashId)
-> ResourceT IO (Either SyncErr (CausalHash, CausalHashId))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT SyncErr (ResourceT IO) (CausalHash, CausalHashId)
-> IO (Either SyncErr (CausalHash, CausalHashId)))
-> ExceptT SyncErr (ResourceT IO) (CausalHash, CausalHashId)
-> IO (Either SyncErr (CausalHash, CausalHashId))
forall a b. (a -> b) -> a -> b
$ Connection
-> CausalHash
-> Maybe BranchRef
-> (Int
-> Stream () DownloadEntitiesChunk
-> ExceptT SyncErr (ResourceT IO) (CausalHash, CausalHashId))
-> ExceptT SyncErr (ResourceT IO) (CausalHash, CausalHashId)
forall (m :: * -> *) r.
MonadIO m =>
Connection
-> CausalHash
-> Maybe BranchRef
-> (Int -> Stream () DownloadEntitiesChunk -> m r)
-> m r
withCodebaseEntityStream Connection
srcConn CausalHash
causalHash Maybe BranchRef
forall a. Maybe a
Nothing \Int
_total Stream () DownloadEntitiesChunk
entityStream -> do
(StreamInitInfo
header, Stream () EntityChunk
rest) <- (Int -> IO ())
-> Stream () DownloadEntitiesChunk
-> StreamM (StreamInitInfo, Stream () EntityChunk)
initializeStream (ProgressCallbacks -> Int -> IO ()
setTotal ProgressCallbacks
progressCounters) Stream () DownloadEntitiesChunk
entityStream
ProgressCallbacks
-> Bool
-> Codebase IO v a
-> StreamInitInfo
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
forall v a.
ProgressCallbacks
-> Bool
-> Codebase IO v a
-> StreamInitInfo
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
streamIntoCodebase ProgressCallbacks
progressCounters Bool
shouldValidate Codebase IO v a
destCodebase StreamInitInfo
header Stream () EntityChunk
rest
CausalHashId
chId <- (IO (Either SyncErr CausalHashId)
-> ResourceT IO (Either SyncErr CausalHashId))
-> ExceptT SyncErr IO CausalHashId
-> ExceptT SyncErr (ResourceT IO) CausalHashId
forall (m :: * -> *) e a (n :: * -> *) e' b.
(m (Either e a) -> n (Either e' b))
-> ExceptT e m a -> ExceptT e' n b
mapExceptT IO (Either SyncErr CausalHashId)
-> ResourceT IO (Either SyncErr CausalHashId)
forall a. IO a -> ResourceT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Codebase IO v a -> Hash32 -> ExceptT SyncErr IO CausalHashId
forall v a.
Codebase IO v a -> Hash32 -> ExceptT SyncErr IO CausalHashId
afterSyncChecks Codebase IO v a
destCodebase (CausalHash -> Hash32
causalHashToHash32 CausalHash
causalHash))
pure (Hash32 -> CausalHash
hash32ToCausalHash (Hash32 -> CausalHash) -> Hash32 -> CausalHash
forall a b. (a -> b) -> a -> b
$ StreamInitInfo -> Hash32
SyncV2.rootCausalHash StreamInitInfo
header, CausalHashId
chId)
syncFromCodeserver ::
Bool ->
Servant.BaseUrl ->
SyncV2.BranchRef ->
Share.HashJWT ->
Cli (Either (SyncError SyncV2.PullError) (CausalHash, CausalHashId))
syncFromCodeserver :: Bool
-> BaseUrl
-> BranchRef
-> HashJWT
-> Cli (Either SyncErr (CausalHash, CausalHashId))
syncFromCodeserver Bool
shouldValidate BaseUrl
unisonShareUrl BranchRef
branchRef HashJWT
hashJwt = do
Cli.Env {AuthenticatedHttpClient
authHTTPClient :: AuthenticatedHttpClient
$sel:authHTTPClient:Env :: Env -> AuthenticatedHttpClient
authHTTPClient, Codebase IO Symbol Ann
$sel:codebase:Env :: Env -> Codebase IO Symbol Ann
codebase :: Codebase IO Symbol Ann
codebase} <- Cli Env
forall r (m :: * -> *). MonadReader r m => m r
ask
Transaction () -> Cli ()
forall a. Transaction a -> Cli a
Cli.runTransaction Transaction ()
Q.clearTempEntityTables
ExceptT SyncErr Cli (CausalHash, CausalHashId)
-> Cli (Either SyncErr (CausalHash, CausalHashId))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT do
Set Hash32
knownHashes <- Cli (Either SyncErr (Set Hash32))
-> ExceptT SyncErr Cli (Set Hash32)
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (Cli (Either SyncErr (Set Hash32))
-> ExceptT SyncErr Cli (Set Hash32))
-> Cli (Either SyncErr (Set Hash32))
-> ExceptT SyncErr Cli (Set Hash32)
forall a b. (a -> b) -> a -> b
$ BaseUrl
-> BranchRef -> HashJWT -> Cli (Either SyncErr (Set Hash32))
negotiateKnownCausals BaseUrl
unisonShareUrl BranchRef
branchRef HashJWT
hashJwt
let hash :: Hash32
hash = HashJWT -> Hash32
Share.hashJWTHash HashJWT
hashJwt
Cli (Either SyncErr ()) -> ExceptT SyncErr Cli ()
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (Cli (Either SyncErr ()) -> ExceptT SyncErr Cli ())
-> Cli (Either SyncErr ()) -> ExceptT SyncErr Cli ()
forall a b. (a -> b) -> a -> b
$ do
(Transaction (Maybe EntityLocation) -> Cli (Maybe EntityLocation)
forall a. Transaction a -> Cli a
Cli.runTransaction (Hash32 -> Transaction (Maybe EntityLocation)
Q.entityLocation Hash32
hash)) Cli (Maybe EntityLocation)
-> (Maybe EntityLocation -> Cli (Either SyncErr ()))
-> Cli (Either SyncErr ())
forall a b. Cli a -> (a -> Cli b) -> Cli b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just EntityLocation
Q.EntityInMainStorage -> Either SyncErr () -> Cli (Either SyncErr ())
forall a. a -> Cli a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either SyncErr () -> Cli (Either SyncErr ()))
-> Either SyncErr () -> Cli (Either SyncErr ())
forall a b. (a -> b) -> a -> b
$ () -> Either SyncErr ()
forall a b. b -> Either a b
Right ()
Maybe EntityLocation
_ -> IO (Either SyncErr ()) -> Cli (Either SyncErr ())
forall a. IO a -> Cli a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either SyncErr ()) -> Cli (Either SyncErr ()))
-> IO (Either SyncErr ()) -> Cli (Either SyncErr ())
forall a b. (a -> b) -> a -> b
$ Bool
-> (ProgressCallbacks -> IO (Either SyncErr ()))
-> IO (Either SyncErr ())
forall (n :: * -> *) a.
MonadUnliftIO n =>
Bool -> (ProgressCallbacks -> n a) -> n a
withStreamProgress Bool
True \ProgressCallbacks
progressCallbacks -> do
FilePath -> IO (Either SyncErr ()) -> IO (Either SyncErr ())
forall (m :: * -> *) a. MonadIO m => FilePath -> m a -> m a
Timing.time FilePath
"Entity Download" (IO (Either SyncErr ()) -> IO (Either SyncErr ()))
-> IO (Either SyncErr ()) -> IO (Either SyncErr ())
forall a b. (a -> b) -> a -> b
$ do
IO (Either SyncErr ()) -> IO (Either SyncErr ())
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either SyncErr ()) -> IO (Either SyncErr ()))
-> (ExceptT SyncErr (ResourceT IO) () -> IO (Either SyncErr ()))
-> ExceptT SyncErr (ResourceT IO) ()
-> IO (Either SyncErr ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ResourceT IO (Either SyncErr ()) -> IO (Either SyncErr ())
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
C.runResourceT (ResourceT IO (Either SyncErr ()) -> IO (Either SyncErr ()))
-> (ExceptT SyncErr (ResourceT IO) ()
-> ResourceT IO (Either SyncErr ()))
-> ExceptT SyncErr (ResourceT IO) ()
-> IO (Either SyncErr ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExceptT SyncErr (ResourceT IO) ()
-> ResourceT IO (Either SyncErr ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT SyncErr (ResourceT IO) () -> IO (Either SyncErr ()))
-> ExceptT SyncErr (ResourceT IO) () -> IO (Either SyncErr ())
forall a b. (a -> b) -> a -> b
$ (Int -> IO ())
-> AuthenticatedHttpClient
-> BaseUrl
-> DownloadEntitiesRequest
-> (StreamInitInfo
-> Stream () EntityChunk -> ExceptT SyncErr (ResourceT IO) ())
-> ExceptT SyncErr (ResourceT IO) ()
httpStreamEntities
(ProgressCallbacks -> Int -> IO ()
setTotal ProgressCallbacks
progressCallbacks)
AuthenticatedHttpClient
authHTTPClient
BaseUrl
unisonShareUrl
SyncV2.DownloadEntitiesRequest {BranchRef
branchRef :: BranchRef
$sel:branchRef:DownloadEntitiesRequest :: BranchRef
branchRef, $sel:causalHash:DownloadEntitiesRequest :: HashJWT
causalHash = HashJWT
hashJwt, Set Hash32
knownHashes :: Set Hash32
$sel:knownHashes:DownloadEntitiesRequest :: Set Hash32
knownHashes}
\StreamInitInfo
header Stream () EntityChunk
stream -> do
Maybe Word64
-> (Word64 -> ExceptT SyncErr (ResourceT IO) ())
-> ExceptT SyncErr (ResourceT IO) ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
whenJust (StreamInitInfo -> Maybe Word64
SyncV2.numEntities StreamInitInfo
header) (IO () -> ExceptT SyncErr (ResourceT IO) ()
forall a. IO a -> ExceptT SyncErr (ResourceT IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT SyncErr (ResourceT IO) ())
-> (Word64 -> IO ()) -> Word64 -> ExceptT SyncErr (ResourceT IO) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProgressCallbacks -> Int -> IO ()
setTotal ProgressCallbacks
progressCallbacks (Int -> IO ()) -> (Word64 -> Int) -> Word64 -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral)
ProgressCallbacks
-> Bool
-> Codebase IO Symbol Ann
-> StreamInitInfo
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
forall v a.
ProgressCallbacks
-> Bool
-> Codebase IO v a
-> StreamInitInfo
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
streamIntoCodebase ProgressCallbacks
progressCallbacks Bool
shouldValidate Codebase IO Symbol Ann
codebase StreamInitInfo
header Stream () EntityChunk
stream
CausalHashId
chId <- (IO (Either SyncErr CausalHashId)
-> Cli (Either SyncErr CausalHashId))
-> ExceptT SyncErr IO CausalHashId
-> ExceptT SyncErr Cli CausalHashId
forall (m :: * -> *) e a (n :: * -> *) e' b.
(m (Either e a) -> n (Either e' b))
-> ExceptT e m a -> ExceptT e' n b
mapExceptT IO (Either SyncErr CausalHashId)
-> Cli (Either SyncErr CausalHashId)
forall a. IO a -> Cli a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Codebase IO Symbol Ann -> Hash32 -> ExceptT SyncErr IO CausalHashId
forall v a.
Codebase IO v a -> Hash32 -> ExceptT SyncErr IO CausalHashId
afterSyncChecks Codebase IO Symbol Ann
codebase Hash32
hash)
pure (Hash32 -> CausalHash
hash32ToCausalHash Hash32
hash, CausalHashId
chId)
validateAndSave :: Bool -> (Codebase.Codebase IO v a) -> Vector (Hash32, TempEntity) -> StreamM ()
validateAndSave :: forall v a.
Bool
-> Codebase IO v a
-> Vector (Hash32, TempEntity)
-> ExceptT SyncErr (ResourceT IO) ()
validateAndSave Bool
shouldValidate Codebase IO v a
codebase Vector (Hash32, TempEntity)
entities = do
let validateEntities :: IO (Either SyncErr ())
validateEntities =
ExceptT SyncErr IO () -> IO (Either SyncErr ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT SyncErr IO () -> IO (Either SyncErr ()))
-> ExceptT SyncErr IO () -> IO (Either SyncErr ())
forall a b. (a -> b) -> a -> b
$ Bool -> ExceptT SyncErr IO () -> ExceptT SyncErr IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
shouldValidate (Vector (Hash32, TempEntity) -> ExceptT SyncErr IO ()
batchValidateEntities Vector (Hash32, TempEntity)
entities)
ResourceT IO (Either SyncErr ())
-> ExceptT SyncErr (ResourceT IO) ()
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (ResourceT IO (Either SyncErr ())
-> ExceptT SyncErr (ResourceT IO) ())
-> (IO (Either SyncErr ()) -> ResourceT IO (Either SyncErr ()))
-> IO (Either SyncErr ())
-> ExceptT SyncErr (ResourceT IO) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Either SyncErr ()) -> ResourceT IO (Either SyncErr ())
forall a. IO a -> ResourceT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either SyncErr ()) -> ExceptT SyncErr (ResourceT IO) ())
-> IO (Either SyncErr ()) -> ExceptT SyncErr (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ IO (Either SyncErr ())
-> (Async (Either SyncErr ()) -> IO (Either SyncErr ()))
-> IO (Either SyncErr ())
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
IO.withAsync IO (Either SyncErr ())
validateEntities \Async (Either SyncErr ())
validationTask -> do
FilePath -> IO (Either SyncErr ()) -> IO (Either SyncErr ())
forall (m :: * -> *) a. MonadIO m => FilePath -> m a -> m a
Timing.time FilePath
"Inserting entities" (IO (Either SyncErr ()) -> IO (Either SyncErr ()))
-> IO (Either SyncErr ()) -> IO (Either SyncErr ())
forall a b. (a -> b) -> a -> b
$ Codebase IO v a
-> ExceptT SyncErr Transaction () -> IO (Either SyncErr ())
forall (m :: * -> *) v a e b.
MonadIO m =>
Codebase m v a -> ExceptT e Transaction b -> m (Either e b)
Codebase.runTransactionExceptT Codebase IO v a
codebase do
Vector (Hash32, TempEntity)
-> ((Hash32, TempEntity) -> ExceptT SyncErr Transaction ())
-> ExceptT SyncErr Transaction ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Vector (Hash32, TempEntity)
entities \(Hash32
hash, TempEntity
entity) -> do
ExceptT SyncErr Transaction (Either CausalHashId ObjectId)
-> ExceptT SyncErr Transaction ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ExceptT SyncErr Transaction (Either CausalHashId ObjectId)
-> ExceptT SyncErr Transaction ())
-> (Transaction (Either CausalHashId ObjectId)
-> ExceptT SyncErr Transaction (Either CausalHashId ObjectId))
-> Transaction (Either CausalHashId ObjectId)
-> ExceptT SyncErr Transaction ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Transaction (Either CausalHashId ObjectId)
-> ExceptT SyncErr Transaction (Either CausalHashId ObjectId)
forall (m :: * -> *) a. Monad m => m a -> ExceptT SyncErr m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Transaction (Either CausalHashId ObjectId)
-> ExceptT SyncErr Transaction ())
-> Transaction (Either CausalHashId ObjectId)
-> ExceptT SyncErr Transaction ()
forall a b. (a -> b) -> a -> b
$ HashHandle
-> Hash32
-> TempEntity
-> Transaction (Either CausalHashId ObjectId)
Q.saveTempEntityInMain HashHandle
v2HashHandle Hash32
hash TempEntity
entity
Transaction (Either SyncErr ())
-> ExceptT SyncErr Transaction (Either SyncErr ())
forall (m :: * -> *) a. Monad m => m a -> ExceptT SyncErr m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (IO (Either SyncErr ()) -> Transaction (Either SyncErr ())
forall a. HasCallStack => IO a -> Transaction a
Sqlite.unsafeIO (Async (Either SyncErr ()) -> IO (Either SyncErr ())
forall (m :: * -> *) a. MonadIO m => Async a -> m a
IO.wait Async (Either SyncErr ())
validationTask)) ExceptT SyncErr Transaction (Either SyncErr ())
-> (Either SyncErr () -> ExceptT SyncErr Transaction ())
-> ExceptT SyncErr Transaction ()
forall a b.
ExceptT SyncErr Transaction a
-> (a -> ExceptT SyncErr Transaction b)
-> ExceptT SyncErr Transaction b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left SyncErr
err -> SyncErr -> ExceptT SyncErr Transaction ()
forall a. SyncErr -> ExceptT SyncErr Transaction a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError SyncErr
err
Right ()
_ -> () -> ExceptT SyncErr Transaction ()
forall a. a -> ExceptT SyncErr Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
batchValidateEntities :: Vector (Hash32, TempEntity) -> ExceptT SyncErr IO ()
batchValidateEntities :: Vector (Hash32, TempEntity) -> ExceptT SyncErr IO ()
batchValidateEntities Vector (Hash32, TempEntity)
entities = do
Vector EntityValidationError
mismatches <- (Vector (Maybe EntityValidationError)
-> Vector EntityValidationError)
-> ExceptT SyncErr IO (Vector (Maybe EntityValidationError))
-> ExceptT SyncErr IO (Vector EntityValidationError)
forall a b.
(a -> b) -> ExceptT SyncErr IO a -> ExceptT SyncErr IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Vector (Maybe EntityValidationError)
-> Vector EntityValidationError
forall a. Vector (Maybe a) -> Vector a
Vector.catMaybes (ExceptT SyncErr IO (Vector (Maybe EntityValidationError))
-> ExceptT SyncErr IO (Vector EntityValidationError))
-> ExceptT SyncErr IO (Vector (Maybe EntityValidationError))
-> ExceptT SyncErr IO (Vector EntityValidationError)
forall a b. (a -> b) -> a -> b
$ IO (Vector (Maybe EntityValidationError))
-> ExceptT SyncErr IO (Vector (Maybe EntityValidationError))
forall a. IO a -> ExceptT SyncErr IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Vector (Maybe EntityValidationError))
-> ExceptT SyncErr IO (Vector (Maybe EntityValidationError)))
-> IO (Vector (Maybe EntityValidationError))
-> ExceptT SyncErr IO (Vector (Maybe EntityValidationError))
forall a b. (a -> b) -> a -> b
$ Vector (Hash32, TempEntity)
-> ((Hash32, TempEntity) -> IO (Maybe EntityValidationError))
-> IO (Vector (Maybe EntityValidationError))
forall (m :: * -> *) (t :: * -> *) a b.
(MonadUnliftIO m, Traversable t) =>
t a -> (a -> m b) -> m (t b)
IO.pooledForConcurrently Vector (Hash32, TempEntity)
entities \(Hash32
hash, TempEntity
entity) -> do
Maybe EntityValidationError -> IO (Maybe EntityValidationError)
forall (m :: * -> *) a. MonadIO m => a -> m a
IO.evaluate (Maybe EntityValidationError -> IO (Maybe EntityValidationError))
-> Maybe EntityValidationError -> IO (Maybe EntityValidationError)
forall a b. (a -> b) -> a -> b
$ Hash32 -> TempEntity -> Maybe EntityValidationError
EV.validateTempEntity Hash32
hash TempEntity
entity
Vector EntityValidationError
-> (EntityValidationError -> ExceptT SyncErr IO ())
-> ExceptT SyncErr IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Vector EntityValidationError
mismatches \case
err :: EntityValidationError
err@(Share.EntityHashMismatch EntityType
et (Share.HashMismatchForEntity {Hash32
supplied :: Hash32
$sel:supplied:HashMismatchForEntity :: HashMismatchForEntity -> Hash32
supplied, Hash32
computed :: Hash32
$sel:computed:HashMismatchForEntity :: HashMismatchForEntity -> Hash32
computed})) ->
let expectedMismatches :: Map Hash32 Hash32
expectedMismatches = case EntityType
et of
EntityType
Share.TermComponentType -> Map Hash32 Hash32
expectedComponentHashMismatches
EntityType
Share.DeclComponentType -> Map Hash32 Hash32
expectedComponentHashMismatches
EntityType
Share.CausalType -> Map Hash32 Hash32
expectedCausalHashMismatches
EntityType
_ -> Map Hash32 Hash32
forall a. Monoid a => a
mempty
in case Hash32 -> Map Hash32 Hash32 -> Maybe Hash32
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Hash32
supplied Map Hash32 Hash32
expectedMismatches of
Just Hash32
expected
| Hash32
expected Hash32 -> Hash32 -> Bool
forall a. Eq a => a -> a -> Bool
== Hash32
computed -> () -> ExceptT SyncErr IO ()
forall a. a -> ExceptT SyncErr IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Maybe Hash32
_ -> do
SyncErr -> ExceptT SyncErr IO ()
forall a. SyncErr -> ExceptT SyncErr IO a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> ExceptT SyncErr IO ())
-> (EntityValidationError -> SyncErr)
-> EntityValidationError
-> ExceptT SyncErr IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (EntityValidationError -> PullError)
-> EntityValidationError
-> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DownloadEntitiesError -> PullError
SyncV2.PullError'DownloadEntities (DownloadEntitiesError -> PullError)
-> (EntityValidationError -> DownloadEntitiesError)
-> EntityValidationError
-> PullError
forall b c a. (b -> c) -> (a -> b) -> a -> c
. EntityValidationError -> DownloadEntitiesError
SyncV2.DownloadEntitiesEntityValidationFailure (EntityValidationError -> ExceptT SyncErr IO ())
-> EntityValidationError -> ExceptT SyncErr IO ()
forall a b. (a -> b) -> a -> b
$ EntityValidationError
err
EntityValidationError
err -> do
SyncErr -> ExceptT SyncErr IO ()
forall a. SyncErr -> ExceptT SyncErr IO a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> ExceptT SyncErr IO ())
-> (EntityValidationError -> SyncErr)
-> EntityValidationError
-> ExceptT SyncErr IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (EntityValidationError -> PullError)
-> EntityValidationError
-> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DownloadEntitiesError -> PullError
SyncV2.PullError'DownloadEntities (DownloadEntitiesError -> PullError)
-> (EntityValidationError -> DownloadEntitiesError)
-> EntityValidationError
-> PullError
forall b c a. (b -> c) -> (a -> b) -> a -> c
. EntityValidationError -> DownloadEntitiesError
SyncV2.DownloadEntitiesEntityValidationFailure (EntityValidationError -> ExceptT SyncErr IO ())
-> EntityValidationError -> ExceptT SyncErr IO ()
forall a b. (a -> b) -> a -> b
$ EntityValidationError
err
syncUnsortedStream ::
ProgressCallbacks ->
Bool ->
(Codebase.Codebase IO v a) ->
Stream () SyncV2.EntityChunk ->
StreamM ()
syncUnsortedStream :: forall v a.
ProgressCallbacks
-> Bool
-> Codebase IO v a
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
syncUnsortedStream (ProgressCallbacks {Int -> IO ()
$sel:setTotal:ProgressCallbacks :: ProgressCallbacks -> Int -> IO ()
setTotal :: Int -> IO ()
setTotal, Int -> IO ()
$sel:downloadCounter:ProgressCallbacks :: ProgressCallbacks -> Int -> IO ()
downloadCounter :: Int -> IO ()
downloadCounter, IO ()
$sel:doneDownloading:ProgressCallbacks :: ProgressCallbacks -> IO ()
doneDownloading :: IO ()
doneDownloading, Int -> IO ()
$sel:importCounter:ProgressCallbacks :: ProgressCallbacks -> Int -> IO ()
importCounter :: Int -> IO ()
importCounter}) Bool
shouldValidate Codebase IO v a
codebase Stream () EntityChunk
stream = do
Vector (Hash32, TempEntity)
allEntities <-
ConduitT
()
Void
(ExceptT SyncErr (ResourceT IO))
(Vector (Hash32, TempEntity))
-> ExceptT SyncErr (ResourceT IO) (Vector (Hash32, TempEntity))
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
C.runConduit (ConduitT
()
Void
(ExceptT SyncErr (ResourceT IO))
(Vector (Hash32, TempEntity))
-> ExceptT SyncErr (ResourceT IO) (Vector (Hash32, TempEntity)))
-> ConduitT
()
Void
(ExceptT SyncErr (ResourceT IO))
(Vector (Hash32, TempEntity))
-> ExceptT SyncErr (ResourceT IO) (Vector (Hash32, TempEntity))
forall a b. (a -> b) -> a -> b
$
Stream () EntityChunk
stream
Stream () EntityChunk
-> ConduitT
EntityChunk
Void
(ExceptT SyncErr (ResourceT IO))
(Vector (Hash32, TempEntity))
-> ConduitT
()
Void
(ExceptT SyncErr (ResourceT IO))
(Vector (Hash32, TempEntity))
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| (EntityChunk -> ExceptT SyncErr (ResourceT IO) ())
-> ConduitT
EntityChunk EntityChunk (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a. Monad m => (a -> m ()) -> ConduitT a a m ()
C.iterM (\EntityChunk
_ -> IO () -> ExceptT SyncErr (ResourceT IO) ()
forall a. IO a -> ExceptT SyncErr (ResourceT IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT SyncErr (ResourceT IO) ())
-> IO () -> ExceptT SyncErr (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
downloadCounter Int
1)
ConduitT
EntityChunk EntityChunk (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT
EntityChunk
Void
(ExceptT SyncErr (ResourceT IO))
(Vector (Hash32, TempEntity))
-> ConduitT
EntityChunk
Void
(ExceptT SyncErr (ResourceT IO))
(Vector (Hash32, TempEntity))
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| Int
-> ConduitT
EntityChunk [EntityChunk] (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a. Monad m => Int -> ConduitT a [a] m ()
CL.chunksOf Int
batchSize
ConduitT
EntityChunk [EntityChunk] (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT
[EntityChunk]
Void
(ExceptT SyncErr (ResourceT IO))
(Vector (Hash32, TempEntity))
-> ConduitT
EntityChunk
Void
(ExceptT SyncErr (ResourceT IO))
(Vector (Hash32, TempEntity))
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| Codebase IO v a
-> Stream [EntityChunk] (Vector (Hash32, TempEntity))
forall v a.
Codebase IO v a
-> Stream [EntityChunk] (Vector (Hash32, TempEntity))
unpackChunks Codebase IO v a
codebase
Stream [EntityChunk] (Vector (Hash32, TempEntity))
-> ConduitT
(Vector (Hash32, TempEntity))
Void
(ExceptT SyncErr (ResourceT IO))
(Vector (Hash32, TempEntity))
-> ConduitT
[EntityChunk]
Void
(ExceptT SyncErr (ResourceT IO))
(Vector (Hash32, TempEntity))
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| Stream (Vector (Hash32, TempEntity)) (Vector (Hash32, TempEntity))
validateBatch
Stream (Vector (Hash32, TempEntity)) (Vector (Hash32, TempEntity))
-> ConduitT
(Vector (Hash32, TempEntity))
Void
(ExceptT SyncErr (ResourceT IO))
(Vector (Hash32, TempEntity))
-> ConduitT
(Vector (Hash32, TempEntity))
Void
(ExceptT SyncErr (ResourceT IO))
(Vector (Hash32, TempEntity))
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| ConduitT
(Vector (Hash32, TempEntity))
(Hash32, TempEntity)
(ExceptT SyncErr (ResourceT IO))
()
ConduitT
(Vector (Hash32, TempEntity))
(Element (Vector (Hash32, TempEntity)))
(ExceptT SyncErr (ResourceT IO))
()
forall (m :: * -> *) mono.
(Monad m, MonoFoldable mono) =>
ConduitT mono (Element mono) m ()
C.concat
ConduitT
(Vector (Hash32, TempEntity))
(Hash32, TempEntity)
(ExceptT SyncErr (ResourceT IO))
()
-> ConduitT
(Hash32, TempEntity)
Void
(ExceptT SyncErr (ResourceT IO))
(Vector (Hash32, TempEntity))
-> ConduitT
(Vector (Hash32, TempEntity))
Void
(ExceptT SyncErr (ResourceT IO))
(Vector (Hash32, TempEntity))
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| forall (v :: * -> *) a (m :: * -> *) o.
(Vector v a, PrimMonad m) =>
ConduitT a o m (v a)
C.sinkVector @Vector
IO () -> ExceptT SyncErr (ResourceT IO) ()
forall a. IO a -> ExceptT SyncErr (ResourceT IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ()
doneDownloading
IO () -> ExceptT SyncErr (ResourceT IO) ()
forall a. IO a -> ExceptT SyncErr (ResourceT IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT SyncErr (ResourceT IO) ())
-> IO () -> ExceptT SyncErr (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
setTotal (Vector (Hash32, TempEntity) -> Int
forall a. Vector a -> Int
Vector.length Vector (Hash32, TempEntity)
allEntities)
let sortedEntities :: [(Hash32, TempEntity)]
sortedEntities = Vector (Hash32, TempEntity) -> [(Hash32, TempEntity)]
forall (f :: * -> *).
(Foldable f, Functor f) =>
f (Hash32, TempEntity) -> [(Hash32, TempEntity)]
sortDependencyFirst Vector (Hash32, TempEntity)
allEntities
IO () -> ExceptT SyncErr (ResourceT IO) ()
forall a. IO a -> ExceptT SyncErr (ResourceT IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT SyncErr (ResourceT IO) ())
-> IO () -> ExceptT SyncErr (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ Codebase IO v a -> Transaction () -> IO ()
forall (m :: * -> *) v a b.
MonadIO m =>
Codebase m v a -> Transaction b -> m b
Codebase.runTransaction Codebase IO v a
codebase (Transaction () -> IO ()) -> Transaction () -> IO ()
forall a b. (a -> b) -> a -> b
$ [(Hash32, TempEntity)]
-> ((Hash32, TempEntity)
-> Transaction (Either CausalHashId ObjectId))
-> Transaction ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [(Hash32, TempEntity)]
sortedEntities \(Hash32
hash, TempEntity
entity) -> do
Either CausalHashId ObjectId
r <- HashHandle
-> Hash32
-> TempEntity
-> Transaction (Either CausalHashId ObjectId)
Q.saveTempEntityInMain HashHandle
v2HashHandle Hash32
hash TempEntity
entity
IO () -> Transaction ()
forall a. HasCallStack => IO a -> Transaction a
Sqlite.unsafeIO (IO () -> Transaction ()) -> IO () -> Transaction ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
importCounter Int
1
pure Either CausalHashId ObjectId
r
where
batchSize :: Int
batchSize :: Int
batchSize = Int
5000
validateBatch :: Stream (Vector (Hash32, TempEntity)) (Vector (Hash32, TempEntity))
validateBatch :: Stream (Vector (Hash32, TempEntity)) (Vector (Hash32, TempEntity))
validateBatch = (Vector (Hash32, TempEntity) -> ExceptT SyncErr (ResourceT IO) ())
-> Stream
(Vector (Hash32, TempEntity)) (Vector (Hash32, TempEntity))
forall (m :: * -> *) a. Monad m => (a -> m ()) -> ConduitT a a m ()
C.iterM \Vector (Hash32, TempEntity)
entities -> do
Bool
-> ExceptT SyncErr (ResourceT IO) ()
-> ExceptT SyncErr (ResourceT IO) ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
shouldValidate ((IO (Either SyncErr ()) -> ResourceT IO (Either SyncErr ()))
-> ExceptT SyncErr IO () -> ExceptT SyncErr (ResourceT IO) ()
forall (m :: * -> *) e a (n :: * -> *) e' b.
(m (Either e a) -> n (Either e' b))
-> ExceptT e m a -> ExceptT e' n b
mapExceptT IO (Either SyncErr ()) -> ResourceT IO (Either SyncErr ())
forall (m :: * -> *) a. Monad m => m a -> ResourceT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ExceptT SyncErr IO () -> ExceptT SyncErr (ResourceT IO) ())
-> ExceptT SyncErr IO () -> ExceptT SyncErr (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ Vector (Hash32, TempEntity) -> ExceptT SyncErr IO ()
batchValidateEntities Vector (Hash32, TempEntity)
entities)
syncSortedStream ::
ProgressCallbacks ->
Bool ->
(Codebase.Codebase IO v a) ->
Stream () SyncV2.EntityChunk ->
StreamM ()
syncSortedStream :: forall v a.
ProgressCallbacks
-> Bool
-> Codebase IO v a
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
syncSortedStream (ProgressCallbacks {Int -> IO ()
$sel:downloadCounter:ProgressCallbacks :: ProgressCallbacks -> Int -> IO ()
downloadCounter :: Int -> IO ()
downloadCounter, IO ()
$sel:doneDownloading:ProgressCallbacks :: ProgressCallbacks -> IO ()
doneDownloading :: IO ()
doneDownloading, Int -> IO ()
$sel:importCounter:ProgressCallbacks :: ProgressCallbacks -> Int -> IO ()
importCounter :: Int -> IO ()
importCounter}) Bool
shouldValidate Codebase IO v a
codebase Stream () EntityChunk
stream = do
(ConduitT EntityChunk Void (ExceptT SyncErr (ResourceT IO)) ()
downloaderSink, Stream () EntityChunk
downloaderSource) <- Int
-> ExceptT
SyncErr
(ResourceT IO)
(ConduitT EntityChunk Void (ExceptT SyncErr (ResourceT IO)) (),
Stream () EntityChunk)
forall (m :: * -> *) i void1 void2.
MonadIO m =>
Int -> m (ConduitT i void1 m (), ConduitT void2 i m ())
parallelSinkAndSource (Int
3 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
batchSize)
(ConduitT
(Vector (Hash32, TempEntity))
Void
(ExceptT SyncErr (ResourceT IO))
()
unpackerSink, ConduitT
()
(Vector (Hash32, TempEntity))
(ExceptT SyncErr (ResourceT IO))
()
unpackerSource) <- Int
-> ExceptT
SyncErr
(ResourceT IO)
(ConduitT
(Vector (Hash32, TempEntity))
Void
(ExceptT SyncErr (ResourceT IO))
(),
ConduitT
()
(Vector (Hash32, TempEntity))
(ExceptT SyncErr (ResourceT IO))
())
forall (m :: * -> *) i void1 void2.
MonadIO m =>
Int -> m (ConduitT i void1 m (), ConduitT void2 i m ())
parallelSinkAndSource Int
5
let handler :: Stream (Vector (Hash32, TempEntity)) o
handler :: forall o. Stream (Vector (Hash32, TempEntity)) o
handler = (Vector (Hash32, TempEntity) -> ExceptT SyncErr (ResourceT IO) ())
-> ConduitT
(Vector (Hash32, TempEntity)) o (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a o.
Monad m =>
(a -> m ()) -> ConduitT a o m ()
C.mapM_C \Vector (Hash32, TempEntity)
entityBatch -> do
Bool
-> Codebase IO v a
-> Vector (Hash32, TempEntity)
-> ExceptT SyncErr (ResourceT IO) ()
forall v a.
Bool
-> Codebase IO v a
-> Vector (Hash32, TempEntity)
-> ExceptT SyncErr (ResourceT IO) ()
validateAndSave Bool
shouldValidate Codebase IO v a
codebase Vector (Hash32, TempEntity)
entityBatch
IO () -> ExceptT SyncErr (ResourceT IO) ()
forall a. IO a -> ExceptT SyncErr (ResourceT IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT SyncErr (ResourceT IO) ())
-> IO () -> ExceptT SyncErr (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
importCounter (Vector (Hash32, TempEntity) -> Int
forall a. Vector a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length Vector (Hash32, TempEntity)
entityBatch)
let downloadC :: ConduitT () Void (ExceptT SyncErr (ResourceT IO)) ()
downloadC = Stream () EntityChunk
stream Stream () EntityChunk
-> ConduitT EntityChunk Void (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT () Void (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| ConduitT EntityChunk Void (ExceptT SyncErr (ResourceT IO)) ()
downloaderSink
let saverC :: ConduitT () Void (ExceptT SyncErr (ResourceT IO)) ()
saverC =
Stream () EntityChunk
downloaderSource
Stream () EntityChunk
-> ConduitT EntityChunk Void (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT () Void (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| Int
-> ConduitT
EntityChunk [EntityChunk] (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a. Monad m => Int -> ConduitT a [a] m ()
CL.chunksOf Int
batchSize
ConduitT
EntityChunk [EntityChunk] (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT [EntityChunk] Void (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT EntityChunk Void (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| Codebase IO v a
-> Stream [EntityChunk] (Vector (Hash32, TempEntity))
forall v a.
Codebase IO v a
-> Stream [EntityChunk] (Vector (Hash32, TempEntity))
unpackChunks Codebase IO v a
codebase
Stream [EntityChunk] (Vector (Hash32, TempEntity))
-> ConduitT
(Vector (Hash32, TempEntity))
Void
(ExceptT SyncErr (ResourceT IO))
()
-> ConduitT [EntityChunk] Void (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| (Vector (Hash32, TempEntity) -> ExceptT SyncErr (ResourceT IO) ())
-> Stream
(Vector (Hash32, TempEntity)) (Vector (Hash32, TempEntity))
forall (m :: * -> *) a. Monad m => (a -> m ()) -> ConduitT a a m ()
C.iterM (IO () -> ExceptT SyncErr (ResourceT IO) ()
forall a. IO a -> ExceptT SyncErr (ResourceT IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT SyncErr (ResourceT IO) ())
-> (Vector (Hash32, TempEntity) -> IO ())
-> Vector (Hash32, TempEntity)
-> ExceptT SyncErr (ResourceT IO) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO ()
downloadCounter (Int -> IO ())
-> (Vector (Hash32, TempEntity) -> Int)
-> Vector (Hash32, TempEntity)
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Vector (Hash32, TempEntity) -> Int
forall a. Vector a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length)
Stream (Vector (Hash32, TempEntity)) (Vector (Hash32, TempEntity))
-> ConduitT
(Vector (Hash32, TempEntity))
Void
(ExceptT SyncErr (ResourceT IO))
()
-> ConduitT
(Vector (Hash32, TempEntity))
Void
(ExceptT SyncErr (ResourceT IO))
()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| (ConduitT
(Vector (Hash32, TempEntity))
Void
(ExceptT SyncErr (ResourceT IO))
()
unpackerSink ConduitT
(Vector (Hash32, TempEntity))
Void
(ExceptT SyncErr (ResourceT IO))
()
-> ConduitT
(Vector (Hash32, TempEntity))
Void
(ExceptT SyncErr (ResourceT IO))
()
-> ConduitT
(Vector (Hash32, TempEntity))
Void
(ExceptT SyncErr (ResourceT IO))
()
forall a b.
ConduitT
(Vector (Hash32, TempEntity))
Void
(ExceptT SyncErr (ResourceT IO))
a
-> ConduitT
(Vector (Hash32, TempEntity))
Void
(ExceptT SyncErr (ResourceT IO))
b
-> ConduitT
(Vector (Hash32, TempEntity))
Void
(ExceptT SyncErr (ResourceT IO))
b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> IO ()
-> ConduitT
(Vector (Hash32, TempEntity))
Void
(ExceptT SyncErr (ResourceT IO))
()
forall a.
IO a
-> ConduitT
(Vector (Hash32, TempEntity))
Void
(ExceptT SyncErr (ResourceT IO))
a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ()
doneDownloading)
let handlerC :: ConduitT () Void (ExceptT SyncErr (ResourceT IO)) ()
handlerC =
ConduitT
()
(Vector (Hash32, TempEntity))
(ExceptT SyncErr (ResourceT IO))
()
unpackerSource
ConduitT
()
(Vector (Hash32, TempEntity))
(ExceptT SyncErr (ResourceT IO))
()
-> ConduitT
(Vector (Hash32, TempEntity))
Void
(ExceptT SyncErr (ResourceT IO))
()
-> ConduitT () Void (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| ConduitT
(Vector (Hash32, TempEntity))
Void
(ExceptT SyncErr (ResourceT IO))
()
forall o. Stream (Vector (Hash32, TempEntity)) o
handler
ResourceT IO (Either SyncErr ())
-> ExceptT SyncErr (ResourceT IO) ()
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (ResourceT IO (Either SyncErr ())
-> ExceptT SyncErr (ResourceT IO) ())
-> (Conc (ResourceT IO) (Either SyncErr ())
-> ResourceT IO (Either SyncErr ()))
-> Conc (ResourceT IO) (Either SyncErr ())
-> ExceptT SyncErr (ResourceT IO) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Conc (ResourceT IO) (Either SyncErr ())
-> ResourceT IO (Either SyncErr ())
forall (m :: * -> *) a. MonadUnliftIO m => Conc m a -> m a
Async.runConc (Conc (ResourceT IO) (Either SyncErr ())
-> ExceptT SyncErr (ResourceT IO) ())
-> Conc (ResourceT IO) (Either SyncErr ())
-> ExceptT SyncErr (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ do
Either SyncErr ()
a <- ResourceT IO (Either SyncErr ())
-> Conc (ResourceT IO) (Either SyncErr ())
forall (m :: * -> *) a. m a -> Conc m a
Async.conc (ResourceT IO (Either SyncErr ())
-> Conc (ResourceT IO) (Either SyncErr ()))
-> (ExceptT SyncErr (ResourceT IO) ()
-> ResourceT IO (Either SyncErr ()))
-> ExceptT SyncErr (ResourceT IO) ()
-> Conc (ResourceT IO) (Either SyncErr ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExceptT SyncErr (ResourceT IO) ()
-> ResourceT IO (Either SyncErr ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT SyncErr (ResourceT IO) ()
-> Conc (ResourceT IO) (Either SyncErr ()))
-> ExceptT SyncErr (ResourceT IO) ()
-> Conc (ResourceT IO) (Either SyncErr ())
forall a b. (a -> b) -> a -> b
$ ConduitT () Void (ExceptT SyncErr (ResourceT IO)) ()
-> ExceptT SyncErr (ResourceT IO) ()
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
C.runConduit ConduitT () Void (ExceptT SyncErr (ResourceT IO)) ()
downloadC
Either SyncErr ()
b <- ResourceT IO (Either SyncErr ())
-> Conc (ResourceT IO) (Either SyncErr ())
forall (m :: * -> *) a. m a -> Conc m a
Async.conc (ResourceT IO (Either SyncErr ())
-> Conc (ResourceT IO) (Either SyncErr ()))
-> (ExceptT SyncErr (ResourceT IO) ()
-> ResourceT IO (Either SyncErr ()))
-> ExceptT SyncErr (ResourceT IO) ()
-> Conc (ResourceT IO) (Either SyncErr ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExceptT SyncErr (ResourceT IO) ()
-> ResourceT IO (Either SyncErr ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT SyncErr (ResourceT IO) ()
-> Conc (ResourceT IO) (Either SyncErr ()))
-> ExceptT SyncErr (ResourceT IO) ()
-> Conc (ResourceT IO) (Either SyncErr ())
forall a b. (a -> b) -> a -> b
$ ConduitT () Void (ExceptT SyncErr (ResourceT IO)) ()
-> ExceptT SyncErr (ResourceT IO) ()
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
C.runConduit ConduitT () Void (ExceptT SyncErr (ResourceT IO)) ()
saverC
Either SyncErr ()
c <- ResourceT IO (Either SyncErr ())
-> Conc (ResourceT IO) (Either SyncErr ())
forall (m :: * -> *) a. m a -> Conc m a
Async.conc (ResourceT IO (Either SyncErr ())
-> Conc (ResourceT IO) (Either SyncErr ()))
-> (ExceptT SyncErr (ResourceT IO) ()
-> ResourceT IO (Either SyncErr ()))
-> ExceptT SyncErr (ResourceT IO) ()
-> Conc (ResourceT IO) (Either SyncErr ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExceptT SyncErr (ResourceT IO) ()
-> ResourceT IO (Either SyncErr ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT SyncErr (ResourceT IO) ()
-> Conc (ResourceT IO) (Either SyncErr ()))
-> ExceptT SyncErr (ResourceT IO) ()
-> Conc (ResourceT IO) (Either SyncErr ())
forall a b. (a -> b) -> a -> b
$ ConduitT () Void (ExceptT SyncErr (ResourceT IO)) ()
-> ExceptT SyncErr (ResourceT IO) ()
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
C.runConduit ConduitT () Void (ExceptT SyncErr (ResourceT IO)) ()
handlerC
pure (Either SyncErr ()
a Either SyncErr () -> Either SyncErr () -> Either SyncErr ()
forall a b.
Either SyncErr a -> Either SyncErr b -> Either SyncErr b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Either SyncErr ()
b Either SyncErr () -> Either SyncErr () -> Either SyncErr ()
forall a b.
Either SyncErr a -> Either SyncErr b -> Either SyncErr b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Either SyncErr ()
c)
where
batchSize :: Int
batchSize = Int
1000
sortDependencyFirst :: (Foldable f, Functor f) => f (Hash32, TempEntity) -> [(Hash32, TempEntity)]
sortDependencyFirst :: forall (f :: * -> *).
(Foldable f, Functor f) =>
f (Hash32, TempEntity) -> [(Hash32, TempEntity)]
sortDependencyFirst f (Hash32, TempEntity)
entities = do
let adjList :: f ((Hash32, TempEntity), Hash32, [Hash32])
adjList = f (Hash32, TempEntity)
entities f (Hash32, TempEntity)
-> ((Hash32, TempEntity)
-> ((Hash32, TempEntity), Hash32, [Hash32]))
-> f ((Hash32, TempEntity), Hash32, [Hash32])
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \(Hash32
hash32, TempEntity
entity) -> ((Hash32
hash32, TempEntity
entity), Hash32
hash32, Set Hash32 -> [Hash32]
forall a. Set a -> [a]
Set.toList (Set Hash32 -> [Hash32]) -> Set Hash32 -> [Hash32]
forall a b. (a -> b) -> a -> b
$ Entity Text Hash32 Hash32 -> Set Hash32
forall hash text noSyncHash.
Ord hash =>
Entity text noSyncHash hash -> Set hash
Share.entityDependencies (TempEntity -> Entity Text Hash32 Hash32
tempEntityToEntity TempEntity
entity))
(Graph
graph, Int -> ((Hash32, TempEntity), Hash32, [Hash32])
vertexInfo, Hash32 -> Maybe Int
_vertexForKey) = [((Hash32, TempEntity), Hash32, [Hash32])]
-> (Graph, Int -> ((Hash32, TempEntity), Hash32, [Hash32]),
Hash32 -> Maybe Int)
forall key node.
Ord key =>
[(node, key, [key])]
-> (Graph, Int -> (node, key, [key]), key -> Maybe Int)
Graph.graphFromEdges (f ((Hash32, TempEntity), Hash32, [Hash32])
-> [((Hash32, TempEntity), Hash32, [Hash32])]
forall a. f a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
Foldable.toList f ((Hash32, TempEntity), Hash32, [Hash32])
adjList)
in Graph -> [Int]
Graph.reverseTopSort Graph
graph [Int] -> (Int -> (Hash32, TempEntity)) -> [(Hash32, TempEntity)]
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \Int
v -> (Getting
(Hash32, TempEntity)
((Hash32, TempEntity), Hash32, [Hash32])
(Hash32, TempEntity)
-> ((Hash32, TempEntity), Hash32, [Hash32]) -> (Hash32, TempEntity)
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting
(Hash32, TempEntity)
((Hash32, TempEntity), Hash32, [Hash32])
(Hash32, TempEntity)
forall s t a b. Field1 s t a b => Lens s t a b
Lens
((Hash32, TempEntity), Hash32, [Hash32])
((Hash32, TempEntity), Hash32, [Hash32])
(Hash32, TempEntity)
(Hash32, TempEntity)
_1 (((Hash32, TempEntity), Hash32, [Hash32]) -> (Hash32, TempEntity))
-> ((Hash32, TempEntity), Hash32, [Hash32]) -> (Hash32, TempEntity)
forall a b. (a -> b) -> a -> b
$ Int -> ((Hash32, TempEntity), Hash32, [Hash32])
vertexInfo Int
v)
unpackChunk :: SyncV2.EntityChunk -> ExceptT SyncErr Sqlite.Transaction (Maybe (Hash32, TempEntity))
unpackChunk :: EntityChunk
-> ExceptT SyncErr Transaction (Maybe (Hash32, TempEntity))
unpackChunk = \case
SyncV2.EntityChunk {Hash32
hash :: Hash32
$sel:hash:EntityChunk :: EntityChunk -> Hash32
hash, $sel:entityCBOR:EntityChunk :: EntityChunk -> CBORBytes TempEntity
entityCBOR = CBORBytes TempEntity
entityBytes} -> do
Transaction (Maybe EntityLocation)
-> ExceptT SyncErr Transaction (Maybe EntityLocation)
forall (m :: * -> *) a. Monad m => m a -> ExceptT SyncErr m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Hash32 -> Transaction (Maybe EntityLocation)
Q.entityLocation Hash32
hash) ExceptT SyncErr Transaction (Maybe EntityLocation)
-> (Maybe EntityLocation
-> ExceptT SyncErr Transaction (Maybe (Hash32, TempEntity)))
-> ExceptT SyncErr Transaction (Maybe (Hash32, TempEntity))
forall a b.
ExceptT SyncErr Transaction a
-> (a -> ExceptT SyncErr Transaction b)
-> ExceptT SyncErr Transaction b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just EntityLocation
Q.EntityInMainStorage -> Maybe (Hash32, TempEntity)
-> ExceptT SyncErr Transaction (Maybe (Hash32, TempEntity))
forall a. a -> ExceptT SyncErr Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Hash32, TempEntity)
forall a. Maybe a
Nothing
Maybe EntityLocation
_ -> do
((Hash32, TempEntity) -> Maybe (Hash32, TempEntity)
forall a. a -> Maybe a
Just ((Hash32, TempEntity) -> Maybe (Hash32, TempEntity))
-> (TempEntity -> (Hash32, TempEntity))
-> TempEntity
-> Maybe (Hash32, TempEntity)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Hash32
hash,)) (TempEntity -> Maybe (Hash32, TempEntity))
-> ExceptT SyncErr Transaction TempEntity
-> ExceptT SyncErr Transaction (Maybe (Hash32, TempEntity))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> CBORBytes TempEntity -> ExceptT SyncErr Transaction TempEntity
unpackEntity CBORBytes TempEntity
entityBytes
where
unpackEntity :: (CBORBytes TempEntity) -> ExceptT SyncErr Sqlite.Transaction TempEntity
unpackEntity :: CBORBytes TempEntity -> ExceptT SyncErr Transaction TempEntity
unpackEntity CBORBytes TempEntity
entityBytes = do
case CBORBytes TempEntity -> Either DeserialiseFailure TempEntity
forall t. Serialise t => CBORBytes t -> Either DeserialiseFailure t
CBOR.deserialiseOrFailCBORBytes CBORBytes TempEntity
entityBytes of
Left DeserialiseFailure
err -> do SyncErr -> ExceptT SyncErr Transaction TempEntity
forall a. SyncErr -> ExceptT SyncErr Transaction a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> ExceptT SyncErr Transaction TempEntity)
-> SyncErr -> ExceptT SyncErr Transaction TempEntity
forall a b. (a -> b) -> a -> b
$ (PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (SyncError -> PullError) -> SyncError -> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError -> PullError
SyncV2.PullError'Sync (SyncError -> SyncErr) -> SyncError -> SyncErr
forall a b. (a -> b) -> a -> b
$ DeserialiseFailure -> SyncError
SyncV2.SyncErrorDeserializationFailure DeserialiseFailure
err)
Right TempEntity
entity -> TempEntity -> ExceptT SyncErr Transaction TempEntity
forall a. a -> ExceptT SyncErr Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TempEntity
entity
unpackChunks :: Codebase.Codebase IO v a -> Stream [SyncV2.EntityChunk] (Vector (Hash32, TempEntity))
unpackChunks :: forall v a.
Codebase IO v a
-> Stream [EntityChunk] (Vector (Hash32, TempEntity))
unpackChunks Codebase IO v a
codebase = ([EntityChunk]
-> ExceptT SyncErr (ResourceT IO) (Vector (Hash32, TempEntity)))
-> Stream [EntityChunk] (Vector (Hash32, TempEntity))
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> ConduitT a b m ()
C.mapM \[EntityChunk]
xs -> ResourceT IO (Either SyncErr (Vector (Hash32, TempEntity)))
-> ExceptT SyncErr (ResourceT IO) (Vector (Hash32, TempEntity))
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (ResourceT IO (Either SyncErr (Vector (Hash32, TempEntity)))
-> ExceptT SyncErr (ResourceT IO) (Vector (Hash32, TempEntity)))
-> (ExceptT SyncErr Transaction (Vector (Hash32, TempEntity))
-> ResourceT IO (Either SyncErr (Vector (Hash32, TempEntity))))
-> ExceptT SyncErr Transaction (Vector (Hash32, TempEntity))
-> ExceptT SyncErr (ResourceT IO) (Vector (Hash32, TempEntity))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Either SyncErr (Vector (Hash32, TempEntity)))
-> ResourceT IO (Either SyncErr (Vector (Hash32, TempEntity)))
forall (m :: * -> *) a. Monad m => m a -> ResourceT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (IO (Either SyncErr (Vector (Hash32, TempEntity)))
-> ResourceT IO (Either SyncErr (Vector (Hash32, TempEntity))))
-> (ExceptT SyncErr Transaction (Vector (Hash32, TempEntity))
-> IO (Either SyncErr (Vector (Hash32, TempEntity))))
-> ExceptT SyncErr Transaction (Vector (Hash32, TempEntity))
-> ResourceT IO (Either SyncErr (Vector (Hash32, TempEntity)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Codebase IO v a
-> ExceptT SyncErr Transaction (Vector (Hash32, TempEntity))
-> IO (Either SyncErr (Vector (Hash32, TempEntity)))
forall (m :: * -> *) v a e b.
MonadIO m =>
Codebase m v a -> ExceptT e Transaction b -> m (Either e b)
Codebase.runTransactionExceptT Codebase IO v a
codebase (ExceptT SyncErr Transaction (Vector (Hash32, TempEntity))
-> ExceptT SyncErr (ResourceT IO) (Vector (Hash32, TempEntity)))
-> ExceptT SyncErr Transaction (Vector (Hash32, TempEntity))
-> ExceptT SyncErr (ResourceT IO) (Vector (Hash32, TempEntity))
forall a b. (a -> b) -> a -> b
$ do
[EntityChunk]
-> (EntityChunk
-> ExceptT SyncErr Transaction (Maybe (Hash32, TempEntity)))
-> ExceptT SyncErr Transaction [Maybe (Hash32, TempEntity)]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for [EntityChunk]
xs EntityChunk
-> ExceptT SyncErr Transaction (Maybe (Hash32, TempEntity))
unpackChunk
ExceptT SyncErr Transaction [Maybe (Hash32, TempEntity)]
-> ([Maybe (Hash32, TempEntity)] -> [(Hash32, TempEntity)])
-> ExceptT SyncErr Transaction [(Hash32, TempEntity)]
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> [Maybe (Hash32, TempEntity)] -> [(Hash32, TempEntity)]
forall a. [Maybe a] -> [a]
catMaybes
ExceptT SyncErr Transaction [(Hash32, TempEntity)]
-> ([(Hash32, TempEntity)] -> Vector (Hash32, TempEntity))
-> ExceptT SyncErr Transaction (Vector (Hash32, TempEntity))
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> [(Hash32, TempEntity)] -> Vector (Hash32, TempEntity)
forall a. [a] -> Vector a
Vector.fromList
streamIntoCodebase ::
ProgressCallbacks ->
Bool ->
Codebase.Codebase IO v a ->
SyncV2.StreamInitInfo ->
Stream () SyncV2.EntityChunk ->
StreamM ()
streamIntoCodebase :: forall v a.
ProgressCallbacks
-> Bool
-> Codebase IO v a
-> StreamInitInfo
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
streamIntoCodebase ProgressCallbacks
progressCounters Bool
shouldValidate Codebase IO v a
codebase SyncV2.StreamInitInfo {Version
version :: Version
$sel:version:StreamInitInfo :: StreamInitInfo -> Version
version, EntitySorting
entitySorting :: EntitySorting
$sel:entitySorting:StreamInitInfo :: StreamInitInfo -> EntitySorting
entitySorting} Stream () EntityChunk
stream = do
case Version
version of
(SyncV2.Version Word16
1) -> () -> ExceptT SyncErr (ResourceT IO) ()
forall a. a -> ExceptT SyncErr (ResourceT IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Version
v -> SyncErr -> ExceptT SyncErr (ResourceT IO) ()
forall a. SyncErr -> ExceptT SyncErr (ResourceT IO) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> ExceptT SyncErr (ResourceT IO) ())
-> (SyncError -> SyncErr)
-> SyncError
-> ExceptT SyncErr (ResourceT IO) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (SyncError -> PullError) -> SyncError -> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError -> PullError
SyncV2.PullError'Sync (SyncError -> ExceptT SyncErr (ResourceT IO) ())
-> SyncError -> ExceptT SyncErr (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ Version -> SyncError
SyncV2.SyncErrorUnsupportedVersion Version
v
case EntitySorting
entitySorting of
EntitySorting
SyncV2.DependenciesFirst -> ProgressCallbacks
-> Bool
-> Codebase IO v a
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
forall v a.
ProgressCallbacks
-> Bool
-> Codebase IO v a
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
syncSortedStream ProgressCallbacks
progressCounters Bool
shouldValidate Codebase IO v a
codebase Stream () EntityChunk
stream
EntitySorting
SyncV2.Unsorted -> ProgressCallbacks
-> Bool
-> Codebase IO v a
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
forall v a.
ProgressCallbacks
-> Bool
-> Codebase IO v a
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
syncUnsortedStream ProgressCallbacks
progressCounters Bool
shouldValidate Codebase IO v a
codebase Stream () EntityChunk
stream
afterSyncChecks :: Codebase.Codebase IO v a -> Hash32 -> ExceptT (SyncError SyncV2.PullError) IO CausalHashId
afterSyncChecks :: forall v a.
Codebase IO v a -> Hash32 -> ExceptT SyncErr IO CausalHashId
afterSyncChecks Codebase IO v a
codebase Hash32
hash = do
CausalHashId
chId <-
IO (Maybe CausalHashId) -> ExceptT SyncErr IO (Maybe CausalHashId)
forall (m :: * -> *) a. Monad m => m a -> ExceptT SyncErr m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Codebase IO v a -> Hash32 -> IO (Maybe CausalHashId)
forall v a. Codebase IO v a -> Hash32 -> IO (Maybe CausalHashId)
didCausalSuccessfullyImport Codebase IO v a
codebase Hash32
hash) ExceptT SyncErr IO (Maybe CausalHashId)
-> (Maybe CausalHashId -> ExceptT SyncErr IO CausalHashId)
-> ExceptT SyncErr IO CausalHashId
forall a b.
ExceptT SyncErr IO a
-> (a -> ExceptT SyncErr IO b) -> ExceptT SyncErr IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe CausalHashId
Nothing -> do
SyncErr -> ExceptT SyncErr IO CausalHashId
forall a. SyncErr -> ExceptT SyncErr IO a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (PullError -> SyncErr
forall e. e -> SyncError e
SyncError (SyncError -> PullError
SyncV2.PullError'Sync (SyncError -> PullError)
-> (Hash32 -> SyncError) -> Hash32 -> PullError
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CausalHash -> SyncError
SyncV2.SyncErrorExpectedResultNotInMain (CausalHash -> SyncError)
-> (Hash32 -> CausalHash) -> Hash32 -> SyncError
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Hash32 -> CausalHash
hash32ToCausalHash (Hash32 -> PullError) -> Hash32 -> PullError
forall a b. (a -> b) -> a -> b
$ Hash32
hash))
Just CausalHashId
chId -> CausalHashId -> ExceptT SyncErr IO CausalHashId
forall a. a -> ExceptT SyncErr IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure CausalHashId
chId
ExceptT SyncErr IO Bool -> ExceptT SyncErr IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ExceptT SyncErr IO Bool -> ExceptT SyncErr IO ())
-> ExceptT SyncErr IO Bool -> ExceptT SyncErr IO ()
forall a b. (a -> b) -> a -> b
$ IO Bool -> ExceptT SyncErr IO Bool
forall a. IO a -> ExceptT SyncErr IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Codebase IO v a -> forall x. (Connection -> IO x) -> IO x
forall (m :: * -> *) v a.
Codebase m v a -> forall x. (Connection -> m x) -> m x
Codebase.withConnection Codebase IO v a
codebase Connection -> IO Bool
Sqlite.vacuum)
pure CausalHashId
chId
where
didCausalSuccessfullyImport :: Codebase.Codebase IO v a -> Hash32 -> IO (Maybe (CausalHashId))
didCausalSuccessfullyImport :: forall v a. Codebase IO v a -> Hash32 -> IO (Maybe CausalHashId)
didCausalSuccessfullyImport Codebase IO v a
codebase Hash32
hash = do
let expectedHash :: CausalHash
expectedHash = Hash32 -> CausalHash
hash32ToCausalHash Hash32
hash
((CausalHashId, BranchHashId) -> CausalHashId)
-> Maybe (CausalHashId, BranchHashId) -> Maybe CausalHashId
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (CausalHashId, BranchHashId) -> CausalHashId
forall a b. (a, b) -> a
fst (Maybe (CausalHashId, BranchHashId) -> Maybe CausalHashId)
-> IO (Maybe (CausalHashId, BranchHashId))
-> IO (Maybe CausalHashId)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Codebase IO v a
-> Transaction (Maybe (CausalHashId, BranchHashId))
-> IO (Maybe (CausalHashId, BranchHashId))
forall (m :: * -> *) v a b.
MonadIO m =>
Codebase m v a -> Transaction b -> m b
Codebase.runTransaction Codebase IO v a
codebase (Transaction (Maybe (CausalHashId, BranchHashId))
-> IO (Maybe (CausalHashId, BranchHashId)))
-> Transaction (Maybe (CausalHashId, BranchHashId))
-> IO (Maybe (CausalHashId, BranchHashId))
forall a b. (a -> b) -> a -> b
$ CausalHash -> Transaction (Maybe (CausalHashId, BranchHashId))
Q.loadCausalByCausalHash CausalHash
expectedHash)
withCodebaseEntityStream ::
(MonadIO m) =>
Sqlite.Connection ->
CausalHash ->
Maybe SyncV2.BranchRef ->
(Int -> Stream () SyncV2.DownloadEntitiesChunk -> m r) ->
m r
withCodebaseEntityStream :: forall (m :: * -> *) r.
MonadIO m =>
Connection
-> CausalHash
-> Maybe BranchRef
-> (Int -> Stream () DownloadEntitiesChunk -> m r)
-> m r
withCodebaseEntityStream Connection
conn CausalHash
rootHash Maybe BranchRef
mayBranchRef Int -> Stream () DownloadEntitiesChunk -> m r
callback = do
Map Hash32 (Entity Text Hash32 Hash32)
entities <- IO (Map Hash32 (Entity Text Hash32 Hash32))
-> m (Map Hash32 (Entity Text Hash32 Hash32))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Map Hash32 (Entity Text Hash32 Hash32))
-> m (Map Hash32 (Entity Text Hash32 Hash32)))
-> IO (Map Hash32 (Entity Text Hash32 Hash32))
-> m (Map Hash32 (Entity Text Hash32 Hash32))
forall a b. (a -> b) -> a -> b
$ ((Int -> IO ()) -> IO (Map Hash32 (Entity Text Hash32 Hash32)))
-> IO (Map Hash32 (Entity Text Hash32 Hash32))
forall (m :: * -> *) a.
MonadUnliftIO m =>
((Int -> m ()) -> m a) -> m a
withEntityLoadingCallback (((Int -> IO ()) -> IO (Map Hash32 (Entity Text Hash32 Hash32)))
-> IO (Map Hash32 (Entity Text Hash32 Hash32)))
-> ((Int -> IO ()) -> IO (Map Hash32 (Entity Text Hash32 Hash32)))
-> IO (Map Hash32 (Entity Text Hash32 Hash32))
forall a b. (a -> b) -> a -> b
$ \Int -> IO ()
counter -> do
Connection
-> Transaction (Map Hash32 (Entity Text Hash32 Hash32))
-> IO (Map Hash32 (Entity Text Hash32 Hash32))
forall (m :: * -> *) a.
(MonadIO m, HasCallStack) =>
Connection -> Transaction a -> m a
Sqlite.runTransaction Connection
conn (CausalHash
-> (Int -> IO ())
-> Transaction (Map Hash32 (Entity Text Hash32 Hash32))
depsForCausal CausalHash
rootHash Int -> IO ()
counter)
let totalEntities :: Int
totalEntities = Int -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$ Map Hash32 (Entity Text Hash32 Hash32) -> Int
forall k a. Map k a -> Int
Map.size Map Hash32 (Entity Text Hash32 Hash32)
entities
let initialChunk :: DownloadEntitiesChunk
initialChunk =
StreamInitInfo -> DownloadEntitiesChunk
SyncV2.InitialC
( SyncV2.StreamInitInfo
{ $sel:rootCausalHash:StreamInitInfo :: Hash32
rootCausalHash = CausalHash -> Hash32
causalHashToHash32 CausalHash
rootHash,
$sel:version:StreamInitInfo :: Version
version = Word16 -> Version
SyncV2.Version Word16
1,
$sel:entitySorting:StreamInitInfo :: EntitySorting
entitySorting = EntitySorting
SyncV2.DependenciesFirst,
$sel:numEntities:StreamInitInfo :: Maybe Word64
numEntities = Word64 -> Maybe Word64
forall a. a -> Maybe a
Just (Word64 -> Maybe Word64) -> Word64 -> Maybe Word64
forall a b. (a -> b) -> a -> b
$ Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
totalEntities,
$sel:rootBranchRef:StreamInitInfo :: Maybe BranchRef
rootBranchRef = Maybe BranchRef
mayBranchRef
}
)
let contents :: [DownloadEntitiesChunk]
contents =
Map Hash32 (Entity Text Hash32 Hash32)
entities
Map Hash32 (Entity Text Hash32 Hash32)
-> (Map Hash32 (Entity Text Hash32 Hash32)
-> Map Hash32 TempEntity)
-> Map Hash32 TempEntity
forall a b. a -> (a -> b) -> b
& (Entity Text Hash32 Hash32 -> TempEntity)
-> Map Hash32 (Entity Text Hash32 Hash32) -> Map Hash32 TempEntity
forall a b. (a -> b) -> Map Hash32 a -> Map Hash32 b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((Hash32 -> Hash32) -> Entity Text Hash32 Hash32 -> TempEntity
forall hash.
(hash -> Hash32) -> Entity Text Hash32 hash -> TempEntity
Sync.entityToTempEntity Hash32 -> Hash32
forall a. a -> a
id)
Map Hash32 TempEntity
-> (Map Hash32 TempEntity -> [(Hash32, TempEntity)])
-> [(Hash32, TempEntity)]
forall a b. a -> (a -> b) -> b
& Map Hash32 TempEntity -> [(Hash32, TempEntity)]
forall k a. Map k a -> [(k, a)]
Map.toList
[(Hash32, TempEntity)]
-> ([(Hash32, TempEntity)] -> [(Hash32, TempEntity)])
-> [(Hash32, TempEntity)]
forall a b. a -> (a -> b) -> b
& [(Hash32, TempEntity)] -> [(Hash32, TempEntity)]
forall (f :: * -> *).
(Foldable f, Functor f) =>
f (Hash32, TempEntity) -> [(Hash32, TempEntity)]
sortDependencyFirst
[(Hash32, TempEntity)]
-> ([(Hash32, TempEntity)] -> [DownloadEntitiesChunk])
-> [DownloadEntitiesChunk]
forall a b. a -> (a -> b) -> b
& ( ((Hash32, TempEntity) -> DownloadEntitiesChunk)
-> [(Hash32, TempEntity)] -> [DownloadEntitiesChunk]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap \(Hash32
hash, TempEntity
entity) ->
let entityCBOR :: CBORBytes TempEntity
entityCBOR = (TempEntity -> CBORBytes TempEntity
forall t. Serialise t => t -> CBORBytes t
CBOR.serialiseCBORBytes TempEntity
entity)
in EntityChunk -> DownloadEntitiesChunk
SyncV2.EntityC (SyncV2.EntityChunk {Hash32
$sel:hash:EntityChunk :: Hash32
hash :: Hash32
hash, CBORBytes TempEntity
$sel:entityCBOR:EntityChunk :: CBORBytes TempEntity
entityCBOR :: CBORBytes TempEntity
entityCBOR})
)
[DownloadEntitiesChunk]
-> ([DownloadEntitiesChunk] -> [DownloadEntitiesChunk])
-> [DownloadEntitiesChunk]
forall a b. a -> (a -> b) -> b
& (DownloadEntitiesChunk
initialChunk DownloadEntitiesChunk
-> [DownloadEntitiesChunk] -> [DownloadEntitiesChunk]
forall a. a -> [a] -> [a]
:)
let stream :: ConduitT
()
(Element [DownloadEntitiesChunk])
(ExceptT SyncErr (ResourceT IO))
()
stream = [DownloadEntitiesChunk]
-> ConduitT
()
(Element [DownloadEntitiesChunk])
(ExceptT SyncErr (ResourceT IO))
()
forall (m :: * -> *) mono i.
(Monad m, MonoFoldable mono) =>
mono -> ConduitT i (Element mono) m ()
C.yieldMany [DownloadEntitiesChunk]
contents
Int -> Stream () DownloadEntitiesChunk -> m r
callback Int
totalEntities ConduitT
()
(Element [DownloadEntitiesChunk])
(ExceptT SyncErr (ResourceT IO))
()
Stream () DownloadEntitiesChunk
stream
where
depsForCausal :: CausalHash -> (Int -> IO ()) -> Sqlite.Transaction (Map Hash32 (Sync.Entity Text Hash32 Hash32))
depsForCausal :: CausalHash
-> (Int -> IO ())
-> Transaction (Map Hash32 (Entity Text Hash32 Hash32))
depsForCausal CausalHash
causalHash Int -> IO ()
counter = do
(StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
-> Map Hash32 (Entity Text Hash32 Hash32)
-> Transaction (Map Hash32 (Entity Text Hash32 Hash32)))
-> Map Hash32 (Entity Text Hash32 Hash32)
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
-> Transaction (Map Hash32 (Entity Text Hash32 Hash32))
forall a b c. (a -> b -> c) -> b -> a -> c
flip StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
-> Map Hash32 (Entity Text Hash32 Hash32)
-> Transaction (Map Hash32 (Entity Text Hash32 Hash32))
forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m s
execStateT Map Hash32 (Entity Text Hash32 Hash32)
forall a. Monoid a => a
mempty (StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
-> Transaction (Map Hash32 (Entity Text Hash32 Hash32)))
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
-> Transaction (Map Hash32 (Entity Text Hash32 Hash32))
forall a b. (a -> b) -> a -> b
$ Hash32
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
expandEntities (CausalHash -> Hash32
causalHashToHash32 CausalHash
causalHash)
where
expandEntities :: Hash32 -> ((StateT (Map Hash32 (Sync.Entity Text Hash32 Hash32)) Sqlite.Transaction)) ()
expandEntities :: Hash32
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
expandEntities Hash32
hash32 = do
(Map Hash32 (Entity Text Hash32 Hash32) -> Bool)
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction Bool
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (Hash32 -> Map Hash32 (Entity Text Hash32 Hash32) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
Map.member Hash32
hash32) StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction Bool
-> (Bool
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ())
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
forall a b.
StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction a
-> (a
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction b)
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Bool
True -> ()
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
forall a.
a -> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Bool
False -> do
Entity Text Hash32 Hash32
entity <- Transaction (Entity Text Hash32 Hash32)
-> StateT
(Map Hash32 (Entity Text Hash32 Hash32))
Transaction
(Entity Text Hash32 Hash32)
forall (m :: * -> *) a.
Monad m =>
m a -> StateT (Map Hash32 (Entity Text Hash32 Hash32)) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Transaction (Entity Text Hash32 Hash32)
-> StateT
(Map Hash32 (Entity Text Hash32 Hash32))
Transaction
(Entity Text Hash32 Hash32))
-> Transaction (Entity Text Hash32 Hash32)
-> StateT
(Map Hash32 (Entity Text Hash32 Hash32))
Transaction
(Entity Text Hash32 Hash32)
forall a b. (a -> b) -> a -> b
$ Hash32 -> Transaction (Entity Text Hash32 Hash32)
Sync.expectEntity Hash32
hash32
(Map Hash32 (Entity Text Hash32 Hash32)
-> Map Hash32 (Entity Text Hash32 Hash32))
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify (Hash32
-> Entity Text Hash32 Hash32
-> Map Hash32 (Entity Text Hash32 Hash32)
-> Map Hash32 (Entity Text Hash32 Hash32)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Hash32
hash32 Entity Text Hash32 Hash32
entity)
Transaction ()
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
forall (m :: * -> *) a.
Monad m =>
m a -> StateT (Map Hash32 (Entity Text Hash32 Hash32)) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Transaction ()
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ())
-> (IO () -> Transaction ())
-> IO ()
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> Transaction ()
forall a. HasCallStack => IO a -> Transaction a
Sqlite.unsafeIO (IO ()
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ())
-> IO ()
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
counter Int
1
Getting
(Traversed
() (StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction))
(Entity Text Hash32 Hash32)
Hash32
-> (Hash32
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ())
-> Entity Text Hash32 Hash32
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
forall (f :: * -> *) r s a.
Functor f =>
Getting (Traversed r f) s a -> (a -> f r) -> s -> f ()
traverseOf_ Getting
(Traversed
() (StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction))
(Entity Text Hash32 Hash32)
Hash32
forall (m :: * -> *) hash' hash text noSyncHash.
(Applicative m, Ord hash') =>
(hash -> m hash')
-> Entity text noSyncHash hash -> m (Entity text noSyncHash hash')
Sync.entityHashes_ Hash32
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
expandEntities Entity Text Hash32 Hash32
entity
_unNetString :: ConduitT ByteString ByteString StreamM ()
_unNetString :: ConduitT ByteString ByteString (ExceptT SyncErr (ResourceT IO)) ()
_unNetString = do
ByteString
bs <- Parser ByteString ByteString
-> ConduitT
ByteString ByteString (ExceptT SyncErr (ResourceT IO)) ByteString
forall a (m :: * -> *) b o.
(AttoparsecInput a, MonadThrow m) =>
Parser a b -> ConduitT a o m b
C.sinkParser (Parser ByteString ByteString
-> ConduitT
ByteString ByteString (ExceptT SyncErr (ResourceT IO)) ByteString)
-> Parser ByteString ByteString
-> ConduitT
ByteString ByteString (ExceptT SyncErr (ResourceT IO)) ByteString
forall a b. (a -> b) -> a -> b
$ do
Int
len <- Parser Int
forall a. Integral a => Parser a
A8.decimal
Char
_ <- Char -> Parser Char
A8.char Char
':'
ByteString
bs <- Int -> Parser ByteString ByteString
A.take Int
len
Char
_ <- Char -> Parser Char
A8.char Char
','
pure ByteString
bs
ByteString
-> ConduitT
ByteString ByteString (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield ByteString
bs
_decodeFramedEntity :: ByteString -> StreamM SyncV2.DownloadEntitiesChunk
_decodeFramedEntity :: ByteString -> StreamM DownloadEntitiesChunk
_decodeFramedEntity ByteString
bs = do
case ByteString -> Either DeserialiseFailure DownloadEntitiesChunk
forall a. Serialise a => ByteString -> Either DeserialiseFailure a
CBOR.deserialiseOrFail (ByteString -> ByteString
BL.fromStrict ByteString
bs) of
Left DeserialiseFailure
err -> SyncErr -> StreamM DownloadEntitiesChunk
forall a. SyncErr -> ExceptT SyncErr (ResourceT IO) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> StreamM DownloadEntitiesChunk)
-> (SyncError -> SyncErr)
-> SyncError
-> StreamM DownloadEntitiesChunk
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (SyncError -> PullError) -> SyncError -> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError -> PullError
SyncV2.PullError'Sync (SyncError -> StreamM DownloadEntitiesChunk)
-> SyncError -> StreamM DownloadEntitiesChunk
forall a b. (a -> b) -> a -> b
$ DeserialiseFailure -> SyncError
SyncV2.SyncErrorDeserializationFailure DeserialiseFailure
err
Right DownloadEntitiesChunk
chunk -> DownloadEntitiesChunk -> StreamM DownloadEntitiesChunk
forall a. a -> ExceptT SyncErr (ResourceT IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure DownloadEntitiesChunk
chunk
decodeUnframedEntities :: forall a. (CBOR.Serialise a) => Stream ByteString a
decodeUnframedEntities :: forall a. Serialise a => Stream ByteString a
decodeUnframedEntities = (forall a.
ExceptT SyncErr (ST RealWorld) a
-> ExceptT SyncErr (ResourceT IO) a)
-> ConduitT ByteString a (ExceptT SyncErr (ST RealWorld)) ()
-> ConduitT ByteString a (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) (n :: * -> *) i o r.
Monad m =>
(forall a. m a -> n a) -> ConduitT i o m r -> ConduitT i o n r
C.transPipe ((ST RealWorld (Either SyncErr a)
-> ResourceT IO (Either SyncErr a))
-> ExceptT SyncErr (ST RealWorld) a
-> ExceptT SyncErr (ResourceT IO) a
forall (m :: * -> *) e a (n :: * -> *) e' b.
(m (Either e a) -> n (Either e' b))
-> ExceptT e m a -> ExceptT e' n b
mapExceptT (IO (Either SyncErr a) -> ResourceT IO (Either SyncErr a)
forall (m :: * -> *) a. Monad m => m a -> ResourceT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (IO (Either SyncErr a) -> ResourceT IO (Either SyncErr a))
-> (ST RealWorld (Either SyncErr a) -> IO (Either SyncErr a))
-> ST RealWorld (Either SyncErr a)
-> ResourceT IO (Either SyncErr a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ST RealWorld (Either SyncErr a) -> IO (Either SyncErr a)
forall a. ST RealWorld a -> IO a
stToIO)) (ConduitT ByteString a (ExceptT SyncErr (ST RealWorld)) ()
-> ConduitT ByteString a (ExceptT SyncErr (ResourceT IO)) ())
-> ConduitT ByteString a (ExceptT SyncErr (ST RealWorld)) ()
-> ConduitT ByteString a (ExceptT SyncErr (ResourceT IO)) ()
forall a b. (a -> b) -> a -> b
$ do
ConduitT
ByteString a (ExceptT SyncErr (ST RealWorld)) (Maybe ByteString)
forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
C.await ConduitT
ByteString a (ExceptT SyncErr (ST RealWorld)) (Maybe ByteString)
-> (Maybe ByteString
-> ConduitT ByteString a (ExceptT SyncErr (ST RealWorld)) ())
-> ConduitT ByteString a (ExceptT SyncErr (ST RealWorld)) ()
forall a b.
ConduitT ByteString a (ExceptT SyncErr (ST RealWorld)) a
-> (a -> ConduitT ByteString a (ExceptT SyncErr (ST RealWorld)) b)
-> ConduitT ByteString a (ExceptT SyncErr (ST RealWorld)) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe ByteString
Nothing -> () -> ConduitT ByteString a (ExceptT SyncErr (ST RealWorld)) ()
forall a.
a -> ConduitT ByteString a (ExceptT SyncErr (ST RealWorld)) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just ByteString
bs -> do
Maybe ByteString -> ST RealWorld (IDecode RealWorld a)
d <- ConduitT
ByteString
a
(ExceptT SyncErr (ST RealWorld))
(Maybe ByteString -> ST RealWorld (IDecode RealWorld a))
forall s.
ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a))
newDecoder
ByteString
-> (Maybe ByteString -> ST RealWorld (IDecode RealWorld a))
-> ConduitT ByteString a (ExceptT SyncErr (ST RealWorld)) ()
forall s.
ByteString
-> (Maybe ByteString -> ST s (IDecode s a))
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
loop ByteString
bs Maybe ByteString -> ST RealWorld (IDecode RealWorld a)
d
where
newDecoder :: ConduitT ByteString a (ExceptT SyncErr (ST s)) (Maybe ByteString -> ST s (CBOR.IDecode s a))
newDecoder :: forall s.
ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a))
newDecoder = do
(ExceptT SyncErr (ST s) (IDecode s a)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a)
forall (m :: * -> *) a. Monad m => m a -> ConduitT ByteString a m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ExceptT SyncErr (ST s) (IDecode s a)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a))
-> (ST s (IDecode s a) -> ExceptT SyncErr (ST s) (IDecode s a))
-> ST s (IDecode s a)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ST s (IDecode s a) -> ExceptT SyncErr (ST s) (IDecode s a)
forall (m :: * -> *) a. Monad m => m a -> ExceptT SyncErr m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift) ST s (IDecode s a)
forall a s. Serialise a => ST s (IDecode s a)
CBOR.deserialiseIncremental ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a)
-> (IDecode s a
-> ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a)))
-> ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a))
forall a b.
ConduitT ByteString a (ExceptT SyncErr (ST s)) a
-> (a -> ConduitT ByteString a (ExceptT SyncErr (ST s)) b)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
CBOR.Done ByteString
_ ByteOffset
_ a
_ -> SyncErr
-> ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a))
forall a.
SyncErr -> ConduitT ByteString a (ExceptT SyncErr (ST s)) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr
-> ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a)))
-> (SyncError -> SyncErr)
-> SyncError
-> ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (SyncError -> PullError) -> SyncError -> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError -> PullError
SyncV2.PullError'Sync (SyncError
-> ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a)))
-> SyncError
-> ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a))
forall a b. (a -> b) -> a -> b
$ Text -> SyncError
SyncV2.SyncErrorStreamFailure Text
"Invalid initial decoder"
CBOR.Fail ByteString
_ ByteOffset
_ DeserialiseFailure
err -> SyncErr
-> ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a))
forall a.
SyncErr -> ConduitT ByteString a (ExceptT SyncErr (ST s)) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr
-> ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a)))
-> (SyncError -> SyncErr)
-> SyncError
-> ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (SyncError -> PullError) -> SyncError -> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError -> PullError
SyncV2.PullError'Sync (SyncError
-> ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a)))
-> SyncError
-> ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a))
forall a b. (a -> b) -> a -> b
$ DeserialiseFailure -> SyncError
SyncV2.SyncErrorDeserializationFailure DeserialiseFailure
err
CBOR.Partial Maybe ByteString -> ST s (IDecode s a)
k -> (Maybe ByteString -> ST s (IDecode s a))
-> ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a))
forall a. a -> ConduitT ByteString a (ExceptT SyncErr (ST s)) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ByteString -> ST s (IDecode s a)
k
loop :: ByteString -> (Maybe ByteString -> ST s (CBOR.IDecode s a)) -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
loop :: forall s.
ByteString
-> (Maybe ByteString -> ST s (IDecode s a))
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
loop ByteString
bs Maybe ByteString -> ST s (IDecode s a)
k = do
(ExceptT SyncErr (ST s) (IDecode s a)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a)
forall (m :: * -> *) a. Monad m => m a -> ConduitT ByteString a m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ExceptT SyncErr (ST s) (IDecode s a)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a))
-> (ST s (IDecode s a) -> ExceptT SyncErr (ST s) (IDecode s a))
-> ST s (IDecode s a)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ST s (IDecode s a) -> ExceptT SyncErr (ST s) (IDecode s a)
forall (m :: * -> *) a. Monad m => m a -> ExceptT SyncErr m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift) (Maybe ByteString -> ST s (IDecode s a)
k (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
bs)) ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a)
-> (IDecode s a
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ())
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall a b.
ConduitT ByteString a (ExceptT SyncErr (ST s)) a
-> (a -> ConduitT ByteString a (ExceptT SyncErr (ST s)) b)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
CBOR.Fail ByteString
_ ByteOffset
_ DeserialiseFailure
err -> SyncErr -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall a.
SyncErr -> ConduitT ByteString a (ExceptT SyncErr (ST s)) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ())
-> (SyncError -> SyncErr)
-> SyncError
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (SyncError -> PullError) -> SyncError -> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError -> PullError
SyncV2.PullError'Sync (SyncError -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ())
-> SyncError -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall a b. (a -> b) -> a -> b
$ DeserialiseFailure -> SyncError
SyncV2.SyncErrorDeserializationFailure DeserialiseFailure
err
CBOR.Partial Maybe ByteString -> ST s (IDecode s a)
k' -> do
Maybe ByteString
nextBS <- ConduitT ByteString a (ExceptT SyncErr (ST s)) (Maybe ByteString)
forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
C.await
case Maybe ByteString
nextBS of
Maybe ByteString
Nothing -> do
(ExceptT SyncErr (ST s) (IDecode s a)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a)
forall (m :: * -> *) a. Monad m => m a -> ConduitT ByteString a m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ExceptT SyncErr (ST s) (IDecode s a)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a))
-> (ST s (IDecode s a) -> ExceptT SyncErr (ST s) (IDecode s a))
-> ST s (IDecode s a)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ST s (IDecode s a) -> ExceptT SyncErr (ST s) (IDecode s a)
forall (m :: * -> *) a. Monad m => m a -> ExceptT SyncErr m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift) (Maybe ByteString -> ST s (IDecode s a)
k' Maybe ByteString
forall a. Maybe a
Nothing) ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a)
-> (IDecode s a
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ())
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall a b.
ConduitT ByteString a (ExceptT SyncErr (ST s)) a
-> (a -> ConduitT ByteString a (ExceptT SyncErr (ST s)) b)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
CBOR.Done ByteString
_ ByteOffset
_ a
a -> a -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield a
a
CBOR.Fail ByteString
_ ByteOffset
_ DeserialiseFailure
err -> SyncErr -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall a.
SyncErr -> ConduitT ByteString a (ExceptT SyncErr (ST s)) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ())
-> (SyncError -> SyncErr)
-> SyncError
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (SyncError -> PullError) -> SyncError -> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError -> PullError
SyncV2.PullError'Sync (SyncError -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ())
-> SyncError -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall a b. (a -> b) -> a -> b
$ DeserialiseFailure -> SyncError
SyncV2.SyncErrorDeserializationFailure DeserialiseFailure
err
CBOR.Partial Maybe ByteString -> ST s (IDecode s a)
_ -> SyncErr -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall a.
SyncErr -> ConduitT ByteString a (ExceptT SyncErr (ST s)) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ())
-> (SyncError -> SyncErr)
-> SyncError
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (SyncError -> PullError) -> SyncError -> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError -> PullError
SyncV2.PullError'Sync (SyncError -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ())
-> SyncError -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall a b. (a -> b) -> a -> b
$ Text -> SyncError
SyncV2.SyncErrorStreamFailure Text
"Unexpected end of input"
Just ByteString
bs' ->
ByteString
-> (Maybe ByteString -> ST s (IDecode s a))
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall s.
ByteString
-> (Maybe ByteString -> ST s (IDecode s a))
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
loop ByteString
bs' Maybe ByteString -> ST s (IDecode s a)
k'
CBOR.Done ByteString
rem ByteOffset
_ a
a -> do
a -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield a
a
if ByteString -> Bool
BS.null ByteString
rem
then do
ConduitT ByteString a (ExceptT SyncErr (ST s)) (Maybe ByteString)
forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
C.await ConduitT ByteString a (ExceptT SyncErr (ST s)) (Maybe ByteString)
-> (Maybe ByteString
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ())
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall a b.
ConduitT ByteString a (ExceptT SyncErr (ST s)) a
-> (a -> ConduitT ByteString a (ExceptT SyncErr (ST s)) b)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe ByteString
Nothing -> () -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall a. a -> ConduitT ByteString a (ExceptT SyncErr (ST s)) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just ByteString
bs'' -> do
Maybe ByteString -> ST s (IDecode s a)
k <- ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a))
forall s.
ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a))
newDecoder
ByteString
-> (Maybe ByteString -> ST s (IDecode s a))
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall s.
ByteString
-> (Maybe ByteString -> ST s (IDecode s a))
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
loop ByteString
bs'' Maybe ByteString -> ST s (IDecode s a)
k
else do
Maybe ByteString -> ST s (IDecode s a)
k <- ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a))
forall s.
ConduitT
ByteString
a
(ExceptT SyncErr (ST s))
(Maybe ByteString -> ST s (IDecode s a))
newDecoder
ByteString
-> (Maybe ByteString -> ST s (IDecode s a))
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall s.
ByteString
-> (Maybe ByteString -> ST s (IDecode s a))
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
loop ByteString
rem Maybe ByteString -> ST s (IDecode s a)
k
type SyncAPI = ("ucm" Servant.:> "v2" Servant.:> "sync" Servant.:> SyncV2.API)
syncAPI :: Proxy SyncAPI
syncAPI :: Proxy SyncAPI
syncAPI = forall t. Proxy t
forall {k} (t :: k). Proxy t
Proxy @SyncAPI
downloadEntitiesStreamClientM :: SyncV2.DownloadEntitiesRequest -> Servant.ClientM (Servant.SourceT IO (CBORStream SyncV2.DownloadEntitiesChunk))
causalDependenciesStreamClientM :: SyncV2.CausalDependenciesRequest -> Servant.ClientM (Servant.SourceT IO (CBORStream SyncV2.CausalDependenciesChunk))
SyncV2.Routes
{ $sel:downloadEntitiesStream:Routes :: forall mode.
Routes mode
-> mode :- ("entities" :> ("download" :> DownloadEntitiesStream))
downloadEntitiesStream = AsClientT ClientM
:- ("entities" :> ("download" :> DownloadEntitiesStream))
DownloadEntitiesRequest
-> ClientM (SourceIO (CBORStream DownloadEntitiesChunk))
downloadEntitiesStreamClientM,
$sel:causalDependenciesStream:Routes :: forall mode.
Routes mode
-> mode
:- ("entities" :> ("dependencies" :> CausalDependenciesStream))
causalDependenciesStream = AsClientT ClientM
:- ("entities" :> ("dependencies" :> CausalDependenciesStream))
CausalDependenciesRequest
-> ClientM (SourceIO (CBORStream CausalDependenciesChunk))
causalDependenciesStreamClientM
} = Proxy SyncAPI -> Client ClientM SyncAPI
forall api.
HasClient ClientM api =>
Proxy api -> Client ClientM api
Servant.client Proxy SyncAPI
syncAPI
withConduit :: forall r chunk. (CBOR.Serialise chunk) => Servant.ClientEnv -> (Stream () chunk -> StreamM r) -> Servant.ClientM (Servant.SourceIO (CBORStream chunk)) -> StreamM r
withConduit :: forall r chunk.
Serialise chunk =>
ClientEnv
-> (Stream () chunk -> StreamM r)
-> ClientM (SourceIO (CBORStream chunk))
-> StreamM r
withConduit ClientEnv
clientEnv Stream () chunk -> StreamM r
callback ClientM (SourceIO (CBORStream chunk))
clientM = do
ResourceT IO (Either SyncErr r) -> StreamM r
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (ResourceT IO (Either SyncErr r) -> StreamM r)
-> ResourceT IO (Either SyncErr r) -> StreamM r
forall a b. (a -> b) -> a -> b
$ ((forall a. ResourceT IO a -> IO a) -> IO (Either SyncErr r))
-> ResourceT IO (Either SyncErr r)
forall b.
((forall a. ResourceT IO a -> IO a) -> IO b) -> ResourceT IO b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO \forall a. ResourceT IO a -> IO a
runInIO -> do
ClientM (SourceIO (CBORStream chunk))
-> ClientEnv
-> (Either ClientError (SourceIO (CBORStream chunk))
-> IO (Either SyncErr r))
-> IO (Either SyncErr r)
forall a b.
ClientM a -> ClientEnv -> (Either ClientError a -> IO b) -> IO b
Servant.withClientM ClientM (SourceIO (CBORStream chunk))
clientM ClientEnv
clientEnv ((Either ClientError (SourceIO (CBORStream chunk))
-> IO (Either SyncErr r))
-> IO (Either SyncErr r))
-> (Either ClientError (SourceIO (CBORStream chunk))
-> IO (Either SyncErr r))
-> IO (Either SyncErr r)
forall a b. (a -> b) -> a -> b
$ \case
Left ClientError
err -> Either SyncErr r -> IO (Either SyncErr r)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either SyncErr r -> IO (Either SyncErr r))
-> (CodeserverTransportError -> Either SyncErr r)
-> CodeserverTransportError
-> IO (Either SyncErr r)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncErr -> Either SyncErr r
forall a b. a -> Either a b
Left (SyncErr -> Either SyncErr r)
-> (CodeserverTransportError -> SyncErr)
-> CodeserverTransportError
-> Either SyncErr r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CodeserverTransportError -> SyncErr
forall e. CodeserverTransportError -> SyncError e
TransportError (CodeserverTransportError -> IO (Either SyncErr r))
-> CodeserverTransportError -> IO (Either SyncErr r)
forall a b. (a -> b) -> a -> b
$ (ClientEnv -> ClientError -> CodeserverTransportError
handleClientError ClientEnv
clientEnv ClientError
err)
Right SourceIO (CBORStream chunk)
sourceT -> do
ConduitT () (CBORStream chunk) (ExceptT SyncErr (ResourceT IO)) ()
conduit <- IO
(ConduitT
() (CBORStream chunk) (ExceptT SyncErr (ResourceT IO)) ())
-> IO
(ConduitT
() (CBORStream chunk) (ExceptT SyncErr (ResourceT IO)) ())
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO
(ConduitT
() (CBORStream chunk) (ExceptT SyncErr (ResourceT IO)) ())
-> IO
(ConduitT
() (CBORStream chunk) (ExceptT SyncErr (ResourceT IO)) ()))
-> IO
(ConduitT
() (CBORStream chunk) (ExceptT SyncErr (ResourceT IO)) ())
-> IO
(ConduitT
() (CBORStream chunk) (ExceptT SyncErr (ResourceT IO)) ())
forall a b. (a -> b) -> a -> b
$ SourceIO (CBORStream chunk)
-> IO
(ConduitT
() (CBORStream chunk) (ExceptT SyncErr (ResourceT IO)) ())
forall chunk a. FromSourceIO chunk a => SourceIO chunk -> IO a
Servant.fromSourceIO SourceIO (CBORStream chunk)
sourceT
(ResourceT IO (Either SyncErr r) -> IO (Either SyncErr r)
forall a. ResourceT IO a -> IO a
runInIO (ResourceT IO (Either SyncErr r) -> IO (Either SyncErr r))
-> (StreamM r -> ResourceT IO (Either SyncErr r))
-> StreamM r
-> IO (Either SyncErr r)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamM r -> ResourceT IO (Either SyncErr r)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (StreamM r -> IO (Either SyncErr r))
-> StreamM r -> IO (Either SyncErr r)
forall a b. (a -> b) -> a -> b
$ Stream () chunk -> StreamM r
callback (ConduitT () (CBORStream chunk) (ExceptT SyncErr (ResourceT IO)) ()
conduit ConduitT () (CBORStream chunk) (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT
(CBORStream chunk) chunk (ExceptT SyncErr (ResourceT IO)) ()
-> Stream () chunk
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| ConduitT
(CBORStream chunk) chunk (ExceptT SyncErr (ResourceT IO)) ()
forall a. Serialise a => Stream (CBORStream a) a
unpackCBORBytesStream))
unpackCBORBytesStream :: (CBOR.Serialise a) => Stream (CBORStream a) a
unpackCBORBytesStream :: forall a. Serialise a => Stream (CBORStream a) a
unpackCBORBytesStream =
(CBORStream a -> ByteString)
-> ConduitT
(CBORStream a) ByteString (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
C.map (ByteString -> ByteString
BL.toStrict (ByteString -> ByteString)
-> (CBORStream a -> ByteString) -> CBORStream a -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. Coercible a b => a -> b
forall a b. Coercible a b => a -> b
coerce @_ @BL.ByteString) ConduitT
(CBORStream a) ByteString (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT ByteString a (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT (CBORStream a) a (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| ConduitT ByteString a (ExceptT SyncErr (ResourceT IO)) ()
forall a. Serialise a => Stream ByteString a
decodeUnframedEntities
handleClientError :: Servant.ClientEnv -> Servant.ClientError -> CodeserverTransportError
handleClientError :: ClientEnv -> ClientError -> CodeserverTransportError
handleClientError ClientEnv
clientEnv ClientError
err =
case ClientError
err of
Servant.FailureResponse RequestF () (BaseUrl, ByteString)
_req Response
resp ->
case Status -> Int
HTTP.statusCode (Status -> Int) -> Status -> Int
forall a b. (a -> b) -> a -> b
$ Response -> Status
forall a. ResponseF a -> Status
Servant.responseStatusCode Response
resp of
Int
401 -> BaseUrl -> CodeserverTransportError
Unauthenticated (ClientEnv -> BaseUrl
Servant.baseUrl ClientEnv
clientEnv)
Int
403 -> Text -> CodeserverTransportError
PermissionDenied (Text -> Text
Text.Lazy.toStrict (Text -> Text) -> (ByteString -> Text) -> ByteString -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text
Text.Lazy.decodeUtf8 (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ Response -> ByteString
forall a. ResponseF a -> a
Servant.responseBody Response
resp)
Int
408 -> CodeserverTransportError
Timeout
Int
429 -> CodeserverTransportError
RateLimitExceeded
Int
504 -> CodeserverTransportError
Timeout
Int
_ -> Response -> CodeserverTransportError
UnexpectedResponse Response
resp
Servant.DecodeFailure Text
msg Response
resp -> Text -> Response -> CodeserverTransportError
DecodeFailure Text
msg Response
resp
Servant.UnsupportedContentType MediaType
_ct Response
resp -> Response -> CodeserverTransportError
UnexpectedResponse Response
resp
Servant.InvalidContentTypeHeader Response
resp -> Response -> CodeserverTransportError
UnexpectedResponse Response
resp
Servant.ConnectionError SomeException
_ -> BaseUrl -> CodeserverTransportError
UnreachableCodeserver (ClientEnv -> BaseUrl
Servant.baseUrl ClientEnv
clientEnv)
httpStreamEntities ::
(Int -> IO ()) ->
Auth.AuthenticatedHttpClient ->
Servant.BaseUrl ->
SyncV2.DownloadEntitiesRequest ->
(SyncV2.StreamInitInfo -> Stream () SyncV2.EntityChunk -> StreamM ()) ->
StreamM ()
httpStreamEntities :: (Int -> IO ())
-> AuthenticatedHttpClient
-> BaseUrl
-> DownloadEntitiesRequest
-> (StreamInitInfo
-> Stream () EntityChunk -> ExceptT SyncErr (ResourceT IO) ())
-> ExceptT SyncErr (ResourceT IO) ()
httpStreamEntities Int -> IO ()
setTotal (Auth.AuthenticatedHttpClient Manager
httpClient) BaseUrl
unisonShareUrl DownloadEntitiesRequest
req StreamInitInfo
-> Stream () EntityChunk -> ExceptT SyncErr (ResourceT IO) ()
callback = do
let clientEnv :: ClientEnv
clientEnv =
(Manager -> BaseUrl -> ClientEnv
Servant.mkClientEnv Manager
httpClient BaseUrl
unisonShareUrl)
{ Servant.makeClientRequest = \BaseUrl
url Request
request ->
(BaseUrl -> Request -> IO Request
Servant.defaultMakeClientRequest BaseUrl
url Request
request)
IO Request -> (Request -> Request) -> IO Request
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \Request
r ->
Request
r
{ Http.Client.responseTimeout = Http.Client.responseTimeoutNone
}
}
(DownloadEntitiesRequest
-> ClientM (SourceIO (CBORStream DownloadEntitiesChunk))
downloadEntitiesStreamClientM DownloadEntitiesRequest
req) ClientM (SourceIO (CBORStream DownloadEntitiesChunk))
-> (ClientM (SourceIO (CBORStream DownloadEntitiesChunk))
-> ExceptT SyncErr (ResourceT IO) ())
-> ExceptT SyncErr (ResourceT IO) ()
forall a b. a -> (a -> b) -> b
& ClientEnv
-> (Stream () DownloadEntitiesChunk
-> ExceptT SyncErr (ResourceT IO) ())
-> ClientM (SourceIO (CBORStream DownloadEntitiesChunk))
-> ExceptT SyncErr (ResourceT IO) ()
forall r chunk.
Serialise chunk =>
ClientEnv
-> (Stream () chunk -> StreamM r)
-> ClientM (SourceIO (CBORStream chunk))
-> StreamM r
withConduit ClientEnv
clientEnv \Stream () DownloadEntitiesChunk
stream -> do
(StreamInitInfo
init, Stream () EntityChunk
entityStream) <- (Int -> IO ())
-> Stream () DownloadEntitiesChunk
-> StreamM (StreamInitInfo, Stream () EntityChunk)
initializeStream Int -> IO ()
setTotal Stream () DownloadEntitiesChunk
stream
StreamInitInfo
-> Stream () EntityChunk -> ExceptT SyncErr (ResourceT IO) ()
callback StreamInitInfo
init Stream () EntityChunk
entityStream
initializeStream :: (Int -> IO ()) -> Stream () SyncV2.DownloadEntitiesChunk -> StreamM (SyncV2.StreamInitInfo, Stream () SyncV2.EntityChunk)
initializeStream :: (Int -> IO ())
-> Stream () DownloadEntitiesChunk
-> StreamM (StreamInitInfo, Stream () EntityChunk)
initializeStream Int -> IO ()
setTotal Stream () DownloadEntitiesChunk
stream = do
(SealedConduitT
() DownloadEntitiesChunk (ExceptT SyncErr (ResourceT IO)) ()
streamRemainder, Maybe DownloadEntitiesChunk
init) <- Stream () DownloadEntitiesChunk
stream Stream () DownloadEntitiesChunk
-> ConduitT
DownloadEntitiesChunk
Void
(ExceptT SyncErr (ResourceT IO))
(Maybe DownloadEntitiesChunk)
-> ExceptT
SyncErr
(ResourceT IO)
(SealedConduitT
() DownloadEntitiesChunk (ExceptT SyncErr (ResourceT IO)) (),
Maybe DownloadEntitiesChunk)
forall (m :: * -> *) a b.
Monad m =>
ConduitT () a m ()
-> ConduitT a Void m b -> m (SealedConduitT () a m (), b)
C.$$+ ConduitT
DownloadEntitiesChunk
Void
(ExceptT SyncErr (ResourceT IO))
(Maybe DownloadEntitiesChunk)
forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
C.headC
case Maybe DownloadEntitiesChunk
init of
Maybe DownloadEntitiesChunk
Nothing -> SyncErr -> StreamM (StreamInitInfo, Stream () EntityChunk)
forall a. SyncErr -> ExceptT SyncErr (ResourceT IO) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> StreamM (StreamInitInfo, Stream () EntityChunk))
-> (SyncError -> SyncErr)
-> SyncError
-> StreamM (StreamInitInfo, Stream () EntityChunk)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (SyncError -> PullError) -> SyncError -> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError -> PullError
SyncV2.PullError'Sync (SyncError -> StreamM (StreamInitInfo, Stream () EntityChunk))
-> SyncError -> StreamM (StreamInitInfo, Stream () EntityChunk)
forall a b. (a -> b) -> a -> b
$ SyncError
SyncV2.SyncErrorMissingInitialChunk
Just DownloadEntitiesChunk
chunk -> do
case DownloadEntitiesChunk
chunk of
SyncV2.InitialC StreamInitInfo
info -> do
let entityStream :: Stream () EntityChunk
entityStream = SealedConduitT
() DownloadEntitiesChunk (ExceptT SyncErr (ResourceT IO)) ()
-> Stream () DownloadEntitiesChunk
forall (m :: * -> *) i o r.
Monad m =>
SealedConduitT i o m r -> ConduitT i o m r
C.unsealConduitT SealedConduitT
() DownloadEntitiesChunk (ExceptT SyncErr (ResourceT IO)) ()
streamRemainder Stream () DownloadEntitiesChunk
-> ConduitT
DownloadEntitiesChunk
EntityChunk
(ExceptT SyncErr (ResourceT IO))
()
-> Stream () EntityChunk
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| (DownloadEntitiesChunk
-> ExceptT SyncErr (ResourceT IO) EntityChunk)
-> ConduitT
DownloadEntitiesChunk
EntityChunk
(ExceptT SyncErr (ResourceT IO))
()
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> ConduitT a b m ()
C.mapM DownloadEntitiesChunk -> ExceptT SyncErr (ResourceT IO) EntityChunk
parseEntity
Maybe Word64
-> (Word64 -> ExceptT SyncErr (ResourceT IO) ())
-> ExceptT SyncErr (ResourceT IO) (Maybe ())
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for (StreamInitInfo -> Maybe Word64
SyncV2.numEntities StreamInitInfo
info) \Word64
t -> IO () -> ExceptT SyncErr (ResourceT IO) ()
forall a. IO a -> ExceptT SyncErr (ResourceT IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT SyncErr (ResourceT IO) ())
-> IO () -> ExceptT SyncErr (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
setTotal (Word64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word64
t)
pure $ (StreamInitInfo
info, Stream () EntityChunk
entityStream)
SyncV2.EntityC EntityChunk
_ -> do
SyncErr -> StreamM (StreamInitInfo, Stream () EntityChunk)
forall a. SyncErr -> ExceptT SyncErr (ResourceT IO) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> StreamM (StreamInitInfo, Stream () EntityChunk))
-> (SyncError -> SyncErr)
-> SyncError
-> StreamM (StreamInitInfo, Stream () EntityChunk)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (SyncError -> PullError) -> SyncError -> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError -> PullError
SyncV2.PullError'Sync (SyncError -> StreamM (StreamInitInfo, Stream () EntityChunk))
-> SyncError -> StreamM (StreamInitInfo, Stream () EntityChunk)
forall a b. (a -> b) -> a -> b
$ SyncError
SyncV2.SyncErrorMissingInitialChunk
SyncV2.ErrorC (SyncV2.ErrorChunk DownloadEntitiesError
err) -> SyncErr -> StreamM (StreamInitInfo, Stream () EntityChunk)
forall a. SyncErr -> ExceptT SyncErr (ResourceT IO) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> StreamM (StreamInitInfo, Stream () EntityChunk))
-> (DownloadEntitiesError -> SyncErr)
-> DownloadEntitiesError
-> StreamM (StreamInitInfo, Stream () EntityChunk)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (DownloadEntitiesError -> PullError)
-> DownloadEntitiesError
-> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DownloadEntitiesError -> PullError
SyncV2.PullError'DownloadEntities (DownloadEntitiesError
-> StreamM (StreamInitInfo, Stream () EntityChunk))
-> DownloadEntitiesError
-> StreamM (StreamInitInfo, Stream () EntityChunk)
forall a b. (a -> b) -> a -> b
$ DownloadEntitiesError
err
where
parseEntity :: SyncV2.DownloadEntitiesChunk -> StreamM SyncV2.EntityChunk
parseEntity :: DownloadEntitiesChunk -> ExceptT SyncErr (ResourceT IO) EntityChunk
parseEntity = \case
SyncV2.EntityC EntityChunk
chunk -> EntityChunk -> ExceptT SyncErr (ResourceT IO) EntityChunk
forall a. a -> ExceptT SyncErr (ResourceT IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure EntityChunk
chunk
SyncV2.ErrorC (SyncV2.ErrorChunk DownloadEntitiesError
err) -> SyncErr -> ExceptT SyncErr (ResourceT IO) EntityChunk
forall a. SyncErr -> ExceptT SyncErr (ResourceT IO) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> ExceptT SyncErr (ResourceT IO) EntityChunk)
-> (PullError -> SyncErr)
-> PullError
-> ExceptT SyncErr (ResourceT IO) EntityChunk
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> ExceptT SyncErr (ResourceT IO) EntityChunk)
-> PullError -> ExceptT SyncErr (ResourceT IO) EntityChunk
forall a b. (a -> b) -> a -> b
$ DownloadEntitiesError -> PullError
SyncV2.PullError'DownloadEntities DownloadEntitiesError
err
SyncV2.InitialC {} -> SyncErr -> ExceptT SyncErr (ResourceT IO) EntityChunk
forall a. SyncErr -> ExceptT SyncErr (ResourceT IO) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> ExceptT SyncErr (ResourceT IO) EntityChunk)
-> (PullError -> SyncErr)
-> PullError
-> ExceptT SyncErr (ResourceT IO) EntityChunk
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> ExceptT SyncErr (ResourceT IO) EntityChunk)
-> PullError -> ExceptT SyncErr (ResourceT IO) EntityChunk
forall a b. (a -> b) -> a -> b
$ SyncError -> PullError
SyncV2.PullError'Sync SyncError
SyncV2.SyncErrorMisplacedInitialChunk
httpStreamCausalDependencies ::
forall r.
Auth.AuthenticatedHttpClient ->
Servant.BaseUrl ->
SyncV2.CausalDependenciesRequest ->
(Stream () SyncV2.CausalDependenciesChunk -> StreamM r) ->
StreamM r
httpStreamCausalDependencies :: forall r.
AuthenticatedHttpClient
-> BaseUrl
-> CausalDependenciesRequest
-> (Stream () CausalDependenciesChunk -> StreamM r)
-> StreamM r
httpStreamCausalDependencies (Auth.AuthenticatedHttpClient Manager
httpClient) BaseUrl
unisonShareUrl CausalDependenciesRequest
req Stream () CausalDependenciesChunk -> StreamM r
callback = do
let clientEnv :: ClientEnv
clientEnv =
(Manager -> BaseUrl -> ClientEnv
Servant.mkClientEnv Manager
httpClient BaseUrl
unisonShareUrl)
{ Servant.makeClientRequest = \BaseUrl
url Request
request ->
(BaseUrl -> Request -> IO Request
Servant.defaultMakeClientRequest BaseUrl
url Request
request)
IO Request -> (Request -> Request) -> IO Request
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \Request
r ->
Request
r
{ Http.Client.responseTimeout = Http.Client.responseTimeoutNone
}
}
(CausalDependenciesRequest
-> ClientM (SourceIO (CBORStream CausalDependenciesChunk))
causalDependenciesStreamClientM CausalDependenciesRequest
req) ClientM (SourceIO (CBORStream CausalDependenciesChunk))
-> (ClientM (SourceIO (CBORStream CausalDependenciesChunk))
-> StreamM r)
-> StreamM r
forall a b. a -> (a -> b) -> b
& ClientEnv
-> (Stream () CausalDependenciesChunk -> StreamM r)
-> ClientM (SourceIO (CBORStream CausalDependenciesChunk))
-> StreamM r
forall r chunk.
Serialise chunk =>
ClientEnv
-> (Stream () chunk -> StreamM r)
-> ClientM (SourceIO (CBORStream chunk))
-> StreamM r
withConduit ClientEnv
clientEnv Stream () CausalDependenciesChunk -> StreamM r
callback
negotiateKnownCausals ::
Servant.BaseUrl ->
SyncV2.BranchRef ->
Share.HashJWT ->
Cli (Either (SyncError SyncV2.PullError) (Set Hash32))
negotiateKnownCausals :: BaseUrl
-> BranchRef -> HashJWT -> Cli (Either SyncErr (Set Hash32))
negotiateKnownCausals BaseUrl
unisonShareUrl BranchRef
branchRef HashJWT
hashJwt = FilePath
-> Cli (Either SyncErr (Set Hash32))
-> Cli (Either SyncErr (Set Hash32))
forall (m :: * -> *) a. MonadIO m => FilePath -> m a -> m a
Timing.time FilePath
"Causal Negotiation" (Cli (Either SyncErr (Set Hash32))
-> Cli (Either SyncErr (Set Hash32)))
-> Cli (Either SyncErr (Set Hash32))
-> Cli (Either SyncErr (Set Hash32))
forall a b. (a -> b) -> a -> b
$ do
Cli.Env {AuthenticatedHttpClient
$sel:authHTTPClient:Env :: Env -> AuthenticatedHttpClient
authHTTPClient :: AuthenticatedHttpClient
authHTTPClient, Codebase IO Symbol Ann
$sel:codebase:Env :: Env -> Codebase IO Symbol Ann
codebase :: Codebase IO Symbol Ann
codebase} <- Cli Env
forall r (m :: * -> *). MonadReader r m => m r
ask
IO (Either SyncErr (Set Hash32))
-> Cli (Either SyncErr (Set Hash32))
forall a. IO a -> Cli a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either SyncErr (Set Hash32))
-> Cli (Either SyncErr (Set Hash32)))
-> IO (Either SyncErr (Set Hash32))
-> Cli (Either SyncErr (Set Hash32))
forall a b. (a -> b) -> a -> b
$ IO (Either SyncErr (Set Hash32))
-> IO (Either SyncErr (Set Hash32))
forall (m :: * -> *) a. (MonadIO m, MonadMask m) => m a -> m a
Console.Regions.displayConsoleRegions do
RegionLayout
-> (ConsoleRegion -> IO (Either SyncErr (Set Hash32)))
-> IO (Either SyncErr (Set Hash32))
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
RegionLayout -> (ConsoleRegion -> m a) -> m a
Console.Regions.withConsoleRegion RegionLayout
Console.Regions.Linear \ConsoleRegion
region -> do
forall v (m :: * -> *).
(ToRegionContent v, LiftRegion m) =>
ConsoleRegion -> v -> m ()
Console.Regions.setConsoleRegion @Text @IO ConsoleRegion
region (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
" 🔎 Identifying missing entities..."
ResourceT IO (Either SyncErr (Set Hash32))
-> IO (Either SyncErr (Set Hash32))
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
C.runResourceT
(ResourceT IO (Either SyncErr (Set Hash32))
-> IO (Either SyncErr (Set Hash32)))
-> (ExceptT SyncErr (ResourceT IO) (Set Hash32)
-> ResourceT IO (Either SyncErr (Set Hash32)))
-> ExceptT SyncErr (ResourceT IO) (Set Hash32)
-> IO (Either SyncErr (Set Hash32))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExceptT SyncErr (ResourceT IO) (Set Hash32)
-> ResourceT IO (Either SyncErr (Set Hash32))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT
(ExceptT SyncErr (ResourceT IO) (Set Hash32)
-> IO (Either SyncErr (Set Hash32)))
-> ExceptT SyncErr (ResourceT IO) (Set Hash32)
-> IO (Either SyncErr (Set Hash32))
forall a b. (a -> b) -> a -> b
$ AuthenticatedHttpClient
-> BaseUrl
-> CausalDependenciesRequest
-> (Stream () CausalDependenciesChunk
-> ExceptT SyncErr (ResourceT IO) (Set Hash32))
-> ExceptT SyncErr (ResourceT IO) (Set Hash32)
forall r.
AuthenticatedHttpClient
-> BaseUrl
-> CausalDependenciesRequest
-> (Stream () CausalDependenciesChunk -> StreamM r)
-> StreamM r
httpStreamCausalDependencies
AuthenticatedHttpClient
authHTTPClient
BaseUrl
unisonShareUrl
SyncV2.CausalDependenciesRequest {BranchRef
branchRef :: BranchRef
$sel:branchRef:CausalDependenciesRequest :: BranchRef
branchRef, $sel:rootCausal:CausalDependenciesRequest :: HashJWT
rootCausal = HashJWT
hashJwt}
\Stream () CausalDependenciesChunk
stream -> do
[Hash32] -> Set Hash32
forall a. Ord a => [a] -> Set a
Set.fromList ([Hash32] -> Set Hash32)
-> ExceptT SyncErr (ResourceT IO) [Hash32]
-> ExceptT SyncErr (ResourceT IO) (Set Hash32)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ConduitT () Void (ExceptT SyncErr (ResourceT IO)) [Hash32]
-> ExceptT SyncErr (ResourceT IO) [Hash32]
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
C.runConduit (Stream () CausalDependenciesChunk
stream Stream () CausalDependenciesChunk
-> ConduitT
CausalDependenciesChunk
Void
(ExceptT SyncErr (ResourceT IO))
[Hash32]
-> ConduitT () Void (ExceptT SyncErr (ResourceT IO)) [Hash32]
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| (CausalDependenciesChunk -> (Hash32, DependencyType))
-> ConduitT
CausalDependenciesChunk
(Hash32, DependencyType)
(ExceptT SyncErr (ResourceT IO))
()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
C.map CausalDependenciesChunk -> (Hash32, DependencyType)
unpack ConduitT
CausalDependenciesChunk
(Hash32, DependencyType)
(ExceptT SyncErr (ResourceT IO))
()
-> ConduitT
(Hash32, DependencyType)
Void
(ExceptT SyncErr (ResourceT IO))
[Hash32]
-> ConduitT
CausalDependenciesChunk
Void
(ExceptT SyncErr (ResourceT IO))
[Hash32]
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| Codebase IO Symbol Ann -> Stream (Hash32, DependencyType) Hash32
forall v a.
Codebase IO v a -> Stream (Hash32, DependencyType) Hash32
findKnownDeps Codebase IO Symbol Ann
codebase Stream (Hash32, DependencyType) Hash32
-> ConduitT Hash32 Void (ExceptT SyncErr (ResourceT IO)) [Hash32]
-> ConduitT
(Hash32, DependencyType)
Void
(ExceptT SyncErr (ResourceT IO))
[Hash32]
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| ConduitT Hash32 Void (ExceptT SyncErr (ResourceT IO)) [Hash32]
forall (m :: * -> *) a o. Monad m => ConduitT a o m [a]
C.sinkList)
where
findKnownDeps :: Codebase.Codebase IO v a -> Stream (Hash32, DependencyType) Hash32
findKnownDeps :: forall v a.
Codebase IO v a -> Stream (Hash32, DependencyType) Hash32
findKnownDeps Codebase IO v a
codebase = do
ConduitT
(Hash32, DependencyType)
Hash32
(ExceptT SyncErr (ResourceT IO))
(Maybe (Hash32, DependencyType))
forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
C.await ConduitT
(Hash32, DependencyType)
Hash32
(ExceptT SyncErr (ResourceT IO))
(Maybe (Hash32, DependencyType))
-> (Maybe (Hash32, DependencyType)
-> Stream (Hash32, DependencyType) Hash32)
-> Stream (Hash32, DependencyType) Hash32
forall a b.
ConduitT
(Hash32, DependencyType) Hash32 (ExceptT SyncErr (ResourceT IO)) a
-> (a
-> ConduitT
(Hash32, DependencyType) Hash32 (ExceptT SyncErr (ResourceT IO)) b)
-> ConduitT
(Hash32, DependencyType) Hash32 (ExceptT SyncErr (ResourceT IO)) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just (Hash32
hash, DependencyType
LibDependency) -> do
ConduitT
(Hash32, DependencyType)
Hash32
(ExceptT SyncErr (ResourceT IO))
Bool
-> Stream (Hash32, DependencyType) Hash32
-> Stream (Hash32, DependencyType) Hash32
forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
whenM (ExceptT SyncErr (ResourceT IO) Bool
-> ConduitT
(Hash32, DependencyType)
Hash32
(ExceptT SyncErr (ResourceT IO))
Bool
forall (m :: * -> *) a.
Monad m =>
m a -> ConduitT (Hash32, DependencyType) Hash32 m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ExceptT SyncErr (ResourceT IO) Bool
-> ConduitT
(Hash32, DependencyType)
Hash32
(ExceptT SyncErr (ResourceT IO))
Bool)
-> ExceptT SyncErr (ResourceT IO) Bool
-> ConduitT
(Hash32, DependencyType)
Hash32
(ExceptT SyncErr (ResourceT IO))
Bool
forall a b. (a -> b) -> a -> b
$ Codebase IO v a -> Hash32 -> ExceptT SyncErr (ResourceT IO) Bool
forall v a.
Codebase IO v a -> Hash32 -> ExceptT SyncErr (ResourceT IO) Bool
haveCausalHash Codebase IO v a
codebase Hash32
hash) (Hash32 -> Stream (Hash32, DependencyType) Hash32
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield Hash32
hash)
Codebase IO v a -> Stream (Hash32, DependencyType) Hash32
forall v a.
Codebase IO v a -> Stream (Hash32, DependencyType) Hash32
findKnownDeps Codebase IO v a
codebase
Just (Hash32
hash, DependencyType
CausalSpineDependency) -> do
ExceptT SyncErr (ResourceT IO) Bool
-> ConduitT
(Hash32, DependencyType)
Hash32
(ExceptT SyncErr (ResourceT IO))
Bool
forall (m :: * -> *) a.
Monad m =>
m a -> ConduitT (Hash32, DependencyType) Hash32 m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Codebase IO v a -> Hash32 -> ExceptT SyncErr (ResourceT IO) Bool
forall v a.
Codebase IO v a -> Hash32 -> ExceptT SyncErr (ResourceT IO) Bool
haveCausalHash Codebase IO v a
codebase Hash32
hash) ConduitT
(Hash32, DependencyType)
Hash32
(ExceptT SyncErr (ResourceT IO))
Bool
-> (Bool -> Stream (Hash32, DependencyType) Hash32)
-> Stream (Hash32, DependencyType) Hash32
forall a b.
ConduitT
(Hash32, DependencyType) Hash32 (ExceptT SyncErr (ResourceT IO)) a
-> (a
-> ConduitT
(Hash32, DependencyType) Hash32 (ExceptT SyncErr (ResourceT IO)) b)
-> ConduitT
(Hash32, DependencyType) Hash32 (ExceptT SyncErr (ResourceT IO)) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Bool
True -> do
Hash32 -> Stream (Hash32, DependencyType) Hash32
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield Hash32
hash
Bool
False -> do
Codebase IO v a -> Stream (Hash32, DependencyType) Hash32
forall v a.
Codebase IO v a -> Stream (Hash32, DependencyType) Hash32
findKnownDeps Codebase IO v a
codebase
Maybe (Hash32, DependencyType)
Nothing -> () -> Stream (Hash32, DependencyType) Hash32
forall a.
a
-> ConduitT
(Hash32, DependencyType) Hash32 (ExceptT SyncErr (ResourceT IO)) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
unpack :: SyncV2.CausalDependenciesChunk -> (Hash32, DependencyType)
unpack :: CausalDependenciesChunk -> (Hash32, DependencyType)
unpack = \case
SyncV2.CausalHashDepC {Hash32
causalHash :: Hash32
$sel:causalHash:CausalHashDepC :: CausalDependenciesChunk -> Hash32
causalHash, DependencyType
dependencyType :: DependencyType
$sel:dependencyType:CausalHashDepC :: CausalDependenciesChunk -> DependencyType
dependencyType} -> (Hash32
causalHash, DependencyType
dependencyType)
haveCausalHash :: Codebase.Codebase IO v a -> Hash32 -> StreamM Bool
haveCausalHash :: forall v a.
Codebase IO v a -> Hash32 -> ExceptT SyncErr (ResourceT IO) Bool
haveCausalHash Codebase IO v a
codebase Hash32
causalHash = do
IO Bool -> ExceptT SyncErr (ResourceT IO) Bool
forall a. IO a -> ExceptT SyncErr (ResourceT IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> ExceptT SyncErr (ResourceT IO) Bool)
-> IO Bool -> ExceptT SyncErr (ResourceT IO) Bool
forall a b. (a -> b) -> a -> b
$ Codebase IO v a -> Transaction Bool -> IO Bool
forall (m :: * -> *) v a b.
MonadIO m =>
Codebase m v a -> Transaction b -> m b
Codebase.runTransaction Codebase IO v a
codebase do
Hash32 -> Transaction Bool
Q.causalExistsByHash32 Hash32
causalHash
counterProgress :: (MonadIO m, MonadUnliftIO n) => (Int -> Text) -> ((Int -> m ()) -> n a) -> n a
counterProgress :: forall (m :: * -> *) (n :: * -> *) a.
(MonadIO m, MonadUnliftIO n) =>
(Int -> Text) -> ((Int -> m ()) -> n a) -> n a
counterProgress Int -> Text
msgBuilder (Int -> m ()) -> n a
action = do
TVar Int
counterVar <- Int -> n (TVar Int)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
IO.newTVarIO (Int
0 :: Int)
((forall a. n a -> IO a) -> IO a) -> n a
forall b. ((forall a. n a -> IO a) -> IO b) -> n b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
IO.withRunInIO \forall a. n a -> IO a
toIO -> do
IO a -> IO a
forall (m :: * -> *) a. (MonadIO m, MonadMask m) => m a -> m a
Console.Regions.displayConsoleRegions do
RegionLayout -> (ConsoleRegion -> IO a) -> IO a
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
RegionLayout -> (ConsoleRegion -> m a) -> m a
Console.Regions.withConsoleRegion RegionLayout
Console.Regions.Linear \ConsoleRegion
region -> do
ConsoleRegion -> STM Text -> IO ()
forall v (m :: * -> *).
(ToRegionContent v, LiftRegion m) =>
ConsoleRegion -> v -> m ()
Console.Regions.setConsoleRegion ConsoleRegion
region do
Int
num <- TVar Int -> STM Int
forall a. TVar a -> STM a
IO.readTVar TVar Int
counterVar
pure $ Int -> Text
msgBuilder Int
num
n a -> IO a
forall a. n a -> IO a
toIO (n a -> IO a) -> n a -> IO a
forall a b. (a -> b) -> a -> b
$ (Int -> m ()) -> n a
action ((Int -> m ()) -> n a) -> (Int -> m ()) -> n a
forall a b. (a -> b) -> a -> b
$ \Int
i -> do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
IO.atomically (TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
IO.modifyTVar' TVar Int
counterVar (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
i))
syncToFileProgress :: (MonadIO m, MonadUnliftIO n) => Maybe Int -> (ConduitT i i m () -> n a) -> n a
syncToFileProgress :: forall (m :: * -> *) (n :: * -> *) i a.
(MonadIO m, MonadUnliftIO n) =>
Maybe Int -> (ConduitT i i m () -> n a) -> n a
syncToFileProgress Maybe Int
total ConduitT i i m () -> n a
action = do
let msg :: Int -> Text
msg Int
n = Text
"\n Exported " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Int -> Text
forall a. Show a => a -> Text
tShow Int
n Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> (Int -> Text) -> Maybe Int -> Text
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Text
"" (\Int
total -> Text
" / " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Int -> Text
forall a. Show a => a -> Text
tShow Int
total) Maybe Int
total Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" entities 📦 \n\n"
let action' :: (Int -> m ()) -> n a
action' Int -> m ()
f = ConduitT i i m () -> n a
action ((i -> m ()) -> ConduitT i i m ()
forall (m :: * -> *) a. Monad m => (a -> m ()) -> ConduitT a a m ()
C.iterM \i
_i -> Int -> m ()
f Int
1)
(Int -> Text) -> ((Int -> m ()) -> n a) -> n a
forall (m :: * -> *) (n :: * -> *) a.
(MonadIO m, MonadUnliftIO n) =>
(Int -> Text) -> ((Int -> m ()) -> n a) -> n a
counterProgress Int -> Text
msg (Int -> m ()) -> n a
action'
withEntityLoadingCallback :: (MonadUnliftIO m) => ((Int -> m ()) -> m a) -> m a
withEntityLoadingCallback :: forall (m :: * -> *) a.
MonadUnliftIO m =>
((Int -> m ()) -> m a) -> m a
withEntityLoadingCallback (Int -> m ()) -> m a
action = do
let msg :: a -> Text
msg a
n = Text
"\n Loading entities from codebase: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> a -> Text
forall a. Show a => a -> Text
tShow a
n Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" 📦\n\n"
(Int -> Text) -> ((Int -> m ()) -> m a) -> m a
forall (m :: * -> *) (n :: * -> *) a.
(MonadIO m, MonadUnliftIO n) =>
(Int -> Text) -> ((Int -> m ()) -> n a) -> n a
counterProgress Int -> Text
forall a. Show a => a -> Text
msg (Int -> m ()) -> m a
action
withStreamProgress :: (MonadUnliftIO n) => Bool -> (ProgressCallbacks -> n a) -> n a
withStreamProgress :: forall (n :: * -> *) a.
MonadUnliftIO n =>
Bool -> (ProgressCallbacks -> n a) -> n a
withStreamProgress Bool
hasDownload ProgressCallbacks -> n a
action = do
TVar Int
downloadedVar <- Int -> n (TVar Int)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
IO.newTVarIO Int
0
TVar Bool
doneUnpackingVar <- Bool -> n (TVar Bool)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
IO.newTVarIO Bool
False
TVar Int
savedVar <- Int -> n (TVar Int)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
IO.newTVarIO (Int
0 :: Int)
TVar (Maybe Int)
totalVar <- Maybe Int -> n (TVar (Maybe Int))
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
IO.newTVarIO Maybe Int
forall a. Maybe a
Nothing
((forall a. n a -> IO a) -> IO a) -> n a
forall b. ((forall a. n a -> IO a) -> IO b) -> n b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
IO.withRunInIO \forall a. n a -> IO a
toIO -> do
IO a -> IO a
forall (m :: * -> *) a. (MonadIO m, MonadMask m) => m a -> m a
Console.Regions.displayConsoleRegions do
RegionLayout -> (ConsoleRegion -> IO a) -> IO a
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
RegionLayout -> (ConsoleRegion -> m a) -> m a
Console.Regions.withConsoleRegion RegionLayout
Console.Regions.Linear \ConsoleRegion
region -> do
ConsoleRegion -> STM Text -> IO ()
forall v (m :: * -> *).
(ToRegionContent v, LiftRegion m) =>
ConsoleRegion -> v -> m ()
Console.Regions.setConsoleRegion ConsoleRegion
region do
Int
downloaded <- TVar Int -> STM Int
forall a. TVar a -> STM a
IO.readTVar TVar Int
downloadedVar
Bool
doneUnpacking <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
IO.readTVar TVar Bool
doneUnpackingVar
Int
saved <- TVar Int -> STM Int
forall a. TVar a -> STM a
IO.readTVar TVar Int
savedVar
Maybe Int
total <- TVar (Maybe Int) -> STM (Maybe Int)
forall a. TVar a -> STM a
IO.readTVar TVar (Maybe Int)
totalVar
pure $
[Text] -> Text
Text.unlines
[ Bool -> Text -> Text
forall a. Monoid a => Bool -> a -> a
Monoid.whenM Bool
hasDownload (Text -> Text) -> Text -> Text
forall a b. (a -> b) -> a -> b
$ Text
"\n Downloaded: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> Text
tShow @Int Int
downloaded Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> (Int -> Text) -> Maybe Int -> Text
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Text
"" (\Int
total -> Text
" / " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> Text
tShow @Int Int
total) Maybe Int
total Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Bool -> Text -> Text
forall a. Monoid a => Bool -> a -> a
Monoid.whenM Bool
doneUnpacking Text
" 🏁",
Text
" Imported: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> Text
tShow @Int Int
saved
]
n a -> IO a
forall a. n a -> IO a
toIO (n a -> IO a) -> n a -> IO a
forall a b. (a -> b) -> a -> b
$
ProgressCallbacks -> n a
action (ProgressCallbacks -> n a) -> ProgressCallbacks -> n a
forall a b. (a -> b) -> a -> b
$
ProgressCallbacks
{ $sel:setTotal:ProgressCallbacks :: Int -> IO ()
setTotal = \Int
total -> do IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
IO.atomically (TVar (Maybe Int) -> Maybe Int -> STM ()
forall a. TVar a -> a -> STM ()
IO.writeTVar TVar (Maybe Int)
totalVar (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
total)),
$sel:downloadCounter:ProgressCallbacks :: Int -> IO ()
downloadCounter = \Int
i -> do IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
IO.atomically (TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
IO.modifyTVar' TVar Int
downloadedVar (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
i)),
$sel:doneDownloading:ProgressCallbacks :: IO ()
doneDownloading = do IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
IO.atomically (TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
IO.writeTVar TVar Bool
doneUnpackingVar Bool
True),
$sel:importCounter:ProgressCallbacks :: Int -> IO ()
importCounter = \Int
i -> do IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
IO.atomically (TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
IO.modifyTVar' TVar Int
savedVar (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
i))
}
parallelSinkAndSource :: (MonadIO m) => Int -> m (ConduitT i void1 m (), ConduitT void2 i m ())
parallelSinkAndSource :: forall (m :: * -> *) i void1 void2.
MonadIO m =>
Int -> m (ConduitT i void1 m (), ConduitT void2 i m ())
parallelSinkAndSource Int
bufferSize = do
TBMQueue i
q <- IO (TBMQueue i) -> m (TBMQueue i)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TBMQueue i) -> m (TBMQueue i))
-> IO (TBMQueue i) -> m (TBMQueue i)
forall a b. (a -> b) -> a -> b
$ Int -> IO (TBMQueue i)
forall a. Int -> IO (TBMQueue a)
STM.newTBMQueueIO Int
bufferSize
let sink :: ConduitT i void1 m ()
sink = do
ConduitT i void1 m (Maybe i)
forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
C.await ConduitT i void1 m (Maybe i)
-> (Maybe i -> ConduitT i void1 m ()) -> ConduitT i void1 m ()
forall a b.
ConduitT i void1 m a
-> (a -> ConduitT i void1 m b) -> ConduitT i void1 m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe i
Nothing -> STM () -> ConduitT i void1 m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
STM.atomically (STM () -> ConduitT i void1 m ())
-> STM () -> ConduitT i void1 m ()
forall a b. (a -> b) -> a -> b
$ TBMQueue i -> STM ()
forall a. TBMQueue a -> STM ()
STM.closeTBMQueue TBMQueue i
q
Just i
chunk -> do
STM () -> ConduitT i void1 m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
STM.atomically (STM () -> ConduitT i void1 m ())
-> STM () -> ConduitT i void1 m ()
forall a b. (a -> b) -> a -> b
$ TBMQueue i -> i -> STM ()
forall a. TBMQueue a -> a -> STM ()
STM.writeTBMQueue TBMQueue i
q i
chunk
ConduitT i void1 m ()
sink
let source :: ConduitT void2 i m ()
source = do
STM (Maybe i) -> ConduitT void2 i m (Maybe i)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
STM.atomically (TBMQueue i -> STM (Maybe i)
forall a. TBMQueue a -> STM (Maybe a)
STM.readTBMQueue TBMQueue i
q) ConduitT void2 i m (Maybe i)
-> (Maybe i -> ConduitT void2 i m ()) -> ConduitT void2 i m ()
forall a b.
ConduitT void2 i m a
-> (a -> ConduitT void2 i m b) -> ConduitT void2 i m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe i
Nothing -> () -> ConduitT void2 i m ()
forall a. a -> ConduitT void2 i m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just i
chunk -> do
i -> ConduitT void2 i m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield i
chunk
ConduitT void2 i m ()
source
(ConduitT i void1 m (), ConduitT void2 i m ())
-> m (ConduitT i void1 m (), ConduitT void2 i m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ConduitT i void1 m ()
sink, ConduitT void2 i m ()
source)