{-# LANGUAGE DataKinds #-}
{-# LANGUAGE TypeOperators #-}

module Unison.Share.SyncV2
  ( syncFromFile,
    syncToFile,
    syncFromCodebase,
    syncFromCodeserver,
  )
where

import Codec.Serialise qualified as CBOR
import Conduit (ConduitT)
import Conduit qualified as C
import Control.Lens
import Control.Monad.Except
import Control.Monad.Reader (ask)
import Control.Monad.ST (ST, stToIO)
import Control.Monad.State
import Data.Attoparsec.ByteString qualified as A
import Data.Attoparsec.ByteString.Char8 qualified as A8
import Data.ByteString qualified as BS
import Data.ByteString.Lazy qualified as BL
import Data.Conduit.Attoparsec qualified as C
import Data.Conduit.Combinators qualified as C
import Data.Conduit.List qualified as CL
import Data.Conduit.Zlib qualified as C
import Data.Foldable qualified as Foldable
import Data.Graph qualified as Graph
import Data.Map qualified as Map
import Data.Proxy
import Data.Set qualified as Set
import Data.Text.IO qualified as Text
import Data.Text.Lazy qualified as Text.Lazy
import Data.Text.Lazy.Encoding qualified as Text.Lazy
import Data.Vector (Vector)
import Data.Vector qualified as Vector
import Network.HTTP.Client qualified as Http.Client
import Network.HTTP.Types qualified as HTTP
import Servant.API qualified as Servant
import Servant.Client.Streaming qualified as Servant
import Servant.Conduit ()
import Servant.Types.SourceT qualified as Servant
import System.Console.Regions qualified as Console.Regions
import U.Codebase.HashTags (CausalHash)
import U.Codebase.Sqlite.Queries qualified as Q
import U.Codebase.Sqlite.TempEntity (TempEntity)
import U.Codebase.Sqlite.V2.HashHandle (v2HashHandle)
import Unison.Auth.HTTPClient qualified as Auth
import Unison.Cli.Monad (Cli)
import Unison.Cli.Monad qualified as Cli
import Unison.Codebase qualified as Codebase
import Unison.Hash32 (Hash32)
import Unison.Prelude
import Unison.Share.API.Hash qualified as Share
import Unison.Share.ExpectedHashMismatches (expectedCausalHashMismatches, expectedComponentHashMismatches)
import Unison.Share.Sync.Types
import Unison.Sqlite qualified as Sqlite
import Unison.Sync.Common (causalHashToHash32, hash32ToCausalHash, tempEntityToEntity)
import Unison.Sync.Common qualified as Sync
import Unison.Sync.EntityValidation qualified as EV
import Unison.Sync.Types qualified as Share
import Unison.Sync.Types qualified as Sync
import Unison.SyncV2.API (Routes (downloadEntitiesStream))
import Unison.SyncV2.API qualified as SyncV2
import Unison.SyncV2.Types (CBORBytes, CBORStream, DependencyType (..))
import Unison.SyncV2.Types qualified as SyncV2
import Unison.Util.Servant.CBOR qualified as CBOR
import Unison.Util.Timing qualified as Timing
import UnliftIO qualified as IO

type Stream i o = ConduitT i o StreamM ()

type SyncErr = SyncError SyncV2.PullError

-- The base monad we use within the conduit pipeline.
type StreamM = (ExceptT SyncErr (C.ResourceT IO))

-- | The number of entities to process in a single transaction.
--
-- SQLite transactions have some fixed overhead, so setting this too low can really slow things down,
-- but going too high here means we may be waiting on the network to get a full batch when we could be starting work.
batchSize :: Int
batchSize :: Int
batchSize = Int
5000

------------------------------------------------------------------------------------------------------------------------
-- Main methods
------------------------------------------------------------------------------------------------------------------------

-- | Sync a given causal hash and its dependencies to a sync-file.
syncToFile ::
  Codebase.Codebase IO v a ->
  -- | Root hash to sync
  CausalHash ->
  -- | Optional name of the branch begin synced
  Maybe SyncV2.BranchRef ->
  -- | Location of the sync-file
  FilePath ->
  IO (Either SyncErr ())
syncToFile :: forall v a.
Codebase IO v a
-> CausalHash
-> Maybe BranchRef
-> FilePath
-> IO (Either SyncErr ())
syncToFile Codebase IO v a
codebase CausalHash
rootHash Maybe BranchRef
mayBranchRef FilePath
destFilePath = do
  IO (Either SyncErr ()) -> IO (Either SyncErr ())
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either SyncErr ()) -> IO (Either SyncErr ()))
-> IO (Either SyncErr ()) -> IO (Either SyncErr ())
forall a b. (a -> b) -> a -> b
$ Codebase IO v a -> forall x. (Connection -> IO x) -> IO x
forall (m :: * -> *) v a.
Codebase m v a -> forall x. (Connection -> m x) -> m x
Codebase.withConnection Codebase IO v a
codebase \Connection
conn -> do
    ResourceT IO (Either SyncErr ()) -> IO (Either SyncErr ())
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
C.runResourceT (ResourceT IO (Either SyncErr ()) -> IO (Either SyncErr ()))
-> ResourceT IO (Either SyncErr ()) -> IO (Either SyncErr ())
forall a b. (a -> b) -> a -> b
$
      Connection
-> CausalHash
-> Maybe BranchRef
-> (Int
    -> Stream () DownloadEntitiesChunk
    -> ResourceT IO (Either SyncErr ()))
-> ResourceT IO (Either SyncErr ())
forall (m :: * -> *) r.
MonadIO m =>
Connection
-> CausalHash
-> Maybe BranchRef
-> (Int -> Stream () DownloadEntitiesChunk -> m r)
-> m r
withCodebaseEntityStream Connection
conn CausalHash
rootHash Maybe BranchRef
mayBranchRef \Int
mayTotal Stream () DownloadEntitiesChunk
stream -> do
        Maybe Int
-> (ConduitT
      DownloadEntitiesChunk
      DownloadEntitiesChunk
      (ExceptT SyncErr (ResourceT IO))
      ()
    -> ResourceT IO (Either SyncErr ()))
-> ResourceT IO (Either SyncErr ())
forall (m :: * -> *) (n :: * -> *) i a.
(MonadIO m, MonadUnliftIO n) =>
Maybe Int -> (ConduitT i i m () -> n a) -> n a
withStreamProgressCallback (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
mayTotal) \ConduitT
  DownloadEntitiesChunk
  DownloadEntitiesChunk
  (ExceptT SyncErr (ResourceT IO))
  ()
countC -> ExceptT SyncErr (ResourceT IO) ()
-> ResourceT IO (Either SyncErr ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT do
          ConduitT () Void (ExceptT SyncErr (ResourceT IO)) ()
-> ExceptT SyncErr (ResourceT IO) ()
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
C.runConduit (ConduitT () Void (ExceptT SyncErr (ResourceT IO)) ()
 -> ExceptT SyncErr (ResourceT IO) ())
-> ConduitT () Void (ExceptT SyncErr (ResourceT IO)) ()
-> ExceptT SyncErr (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$
            Stream () DownloadEntitiesChunk
stream
              Stream () DownloadEntitiesChunk
-> ConduitT
     DownloadEntitiesChunk Void (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT () Void (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| ConduitT
  DownloadEntitiesChunk
  DownloadEntitiesChunk
  (ExceptT SyncErr (ResourceT IO))
  ()
countC
              ConduitT
  DownloadEntitiesChunk
  DownloadEntitiesChunk
  (ExceptT SyncErr (ResourceT IO))
  ()
-> ConduitT
     DownloadEntitiesChunk Void (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT
     DownloadEntitiesChunk Void (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| (DownloadEntitiesChunk -> ByteString)
-> ConduitT
     DownloadEntitiesChunk
     ByteString
     (ExceptT SyncErr (ResourceT IO))
     ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
C.map (ByteString -> ByteString
BL.toStrict (ByteString -> ByteString)
-> (DownloadEntitiesChunk -> ByteString)
-> DownloadEntitiesChunk
-> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DownloadEntitiesChunk -> ByteString
forall a. Serialise a => a -> ByteString
CBOR.serialise)
              ConduitT
  DownloadEntitiesChunk
  ByteString
  (ExceptT SyncErr (ResourceT IO))
  ()
-> ConduitT ByteString Void (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT
     DownloadEntitiesChunk Void (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| (forall a. IO a -> ExceptT SyncErr (ResourceT IO) a)
-> ConduitT ByteString ByteString IO ()
-> ConduitT
     ByteString ByteString (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) (n :: * -> *) i o r.
Monad m =>
(forall a. m a -> n a) -> ConduitT i o m r -> ConduitT i o n r
C.transPipe IO a -> ExceptT SyncErr (ResourceT IO) a
forall a. IO a -> ExceptT SyncErr (ResourceT IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO ConduitT ByteString ByteString IO ()
forall (m :: * -> *).
(MonadThrow m, PrimMonad m) =>
ConduitT ByteString ByteString m ()
C.gzip
              ConduitT ByteString ByteString (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT ByteString Void (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT ByteString Void (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| FilePath
-> ConduitT ByteString Void (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) o.
MonadResource m =>
FilePath -> ConduitT ByteString o m ()
C.sinkFile FilePath
destFilePath

syncFromFile ::
  -- | Whether to validate entities as they're imported.
  Bool ->
  -- | Location of the sync-file
  FilePath ->
  Cli (Either (SyncError SyncV2.PullError) CausalHash)
syncFromFile :: Bool -> FilePath -> Cli (Either SyncErr CausalHash)
syncFromFile Bool
shouldValidate FilePath
syncFilePath = do
  Cli.Env {Codebase IO Symbol Ann
codebase :: Codebase IO Symbol Ann
$sel:codebase:Env :: Env -> Codebase IO Symbol Ann
codebase} <- Cli Env
forall r (m :: * -> *). MonadReader r m => m r
ask
  -- Every insert into SQLite checks the temp entity tables, but syncv2 doesn't actually use them, so it's faster
  -- if we clear them out before starting a sync.
  Transaction () -> Cli ()
forall a. Transaction a -> Cli a
Cli.runTransaction Transaction ()
Q.clearTempEntityTables
  ExceptT SyncErr Cli CausalHash -> Cli (Either SyncErr CausalHash)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT do
    (IO (Either SyncErr CausalHash) -> Cli (Either SyncErr CausalHash))
-> ExceptT SyncErr IO CausalHash -> ExceptT SyncErr Cli CausalHash
forall (m :: * -> *) e a (n :: * -> *) e' b.
(m (Either e a) -> n (Either e' b))
-> ExceptT e m a -> ExceptT e' n b
mapExceptT IO (Either SyncErr CausalHash) -> Cli (Either SyncErr CausalHash)
forall a. IO a -> Cli a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (ExceptT SyncErr IO CausalHash -> ExceptT SyncErr Cli CausalHash)
-> ExceptT SyncErr IO CausalHash -> ExceptT SyncErr Cli CausalHash
forall a b. (a -> b) -> a -> b
$ FilePath
-> ExceptT SyncErr IO CausalHash -> ExceptT SyncErr IO CausalHash
forall (m :: * -> *) a. MonadIO m => FilePath -> m a -> m a
Timing.time FilePath
"File Sync" (ExceptT SyncErr IO CausalHash -> ExceptT SyncErr IO CausalHash)
-> ExceptT SyncErr IO CausalHash -> ExceptT SyncErr IO CausalHash
forall a b. (a -> b) -> a -> b
$ do
      StreamInitInfo
header <- (ResourceT IO (Either SyncErr StreamInitInfo)
 -> IO (Either SyncErr StreamInitInfo))
-> ExceptT SyncErr (ResourceT IO) StreamInitInfo
-> ExceptT SyncErr IO StreamInitInfo
forall (m :: * -> *) e a (n :: * -> *) e' b.
(m (Either e a) -> n (Either e' b))
-> ExceptT e m a -> ExceptT e' n b
mapExceptT ResourceT IO (Either SyncErr StreamInitInfo)
-> IO (Either SyncErr StreamInitInfo)
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
C.runResourceT (ExceptT SyncErr (ResourceT IO) StreamInitInfo
 -> ExceptT SyncErr IO StreamInitInfo)
-> ExceptT SyncErr (ResourceT IO) StreamInitInfo
-> ExceptT SyncErr IO StreamInitInfo
forall a b. (a -> b) -> a -> b
$ do
        let stream :: Stream () DownloadEntitiesChunk
stream = FilePath
-> ConduitT () ByteString (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) i.
MonadResource m =>
FilePath -> ConduitT i ByteString m ()
C.sourceFile FilePath
syncFilePath ConduitT () ByteString (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT
     ByteString
     DownloadEntitiesChunk
     (ExceptT SyncErr (ResourceT IO))
     ()
-> Stream () DownloadEntitiesChunk
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| ConduitT ByteString ByteString (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *).
(PrimMonad m, MonadThrow m) =>
ConduitT ByteString ByteString m ()
C.ungzip ConduitT ByteString ByteString (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT
     ByteString
     DownloadEntitiesChunk
     (ExceptT SyncErr (ResourceT IO))
     ()
-> ConduitT
     ByteString
     DownloadEntitiesChunk
     (ExceptT SyncErr (ResourceT IO))
     ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| ConduitT
  ByteString
  DownloadEntitiesChunk
  (ExceptT SyncErr (ResourceT IO))
  ()
forall a. Serialise a => Stream ByteString a
decodeUnframedEntities
        (StreamInitInfo
header, Stream () EntityChunk
rest) <- Stream () DownloadEntitiesChunk
-> StreamM (StreamInitInfo, Stream () EntityChunk)
initializeStream Stream () DownloadEntitiesChunk
stream
        Bool
-> Codebase IO Symbol Ann
-> StreamInitInfo
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
forall v a.
Bool
-> Codebase IO v a
-> StreamInitInfo
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
streamIntoCodebase Bool
shouldValidate Codebase IO Symbol Ann
codebase StreamInitInfo
header Stream () EntityChunk
rest
        pure StreamInitInfo
header
      Codebase IO Symbol Ann -> Hash32 -> ExceptT SyncErr IO ()
forall v a. Codebase IO v a -> Hash32 -> ExceptT SyncErr IO ()
afterSyncChecks Codebase IO Symbol Ann
codebase (StreamInitInfo -> Hash32
SyncV2.rootCausalHash StreamInitInfo
header)
      CausalHash -> ExceptT SyncErr IO CausalHash
forall a. a -> ExceptT SyncErr IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (CausalHash -> ExceptT SyncErr IO CausalHash)
-> (Hash32 -> CausalHash)
-> Hash32
-> ExceptT SyncErr IO CausalHash
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Hash32 -> CausalHash
hash32ToCausalHash (Hash32 -> ExceptT SyncErr IO CausalHash)
-> Hash32 -> ExceptT SyncErr IO CausalHash
forall a b. (a -> b) -> a -> b
$ StreamInitInfo -> Hash32
SyncV2.rootCausalHash StreamInitInfo
header

syncFromCodebase ::
  Bool ->
  -- | The codebase to sync from.
  Sqlite.Connection ->
  (Codebase.Codebase IO v a) ->
  -- | The hash to sync.
  CausalHash ->
  IO (Either (SyncError SyncV2.PullError) ())
syncFromCodebase :: forall v a.
Bool
-> Connection
-> Codebase IO v a
-> CausalHash
-> IO (Either SyncErr ())
syncFromCodebase Bool
shouldValidate Connection
srcConn Codebase IO v a
destCodebase CausalHash
causalHash = do
  -- Every insert into SQLite checks the temp entity tables, but syncv2 doesn't actually use them, so it's faster
  -- if we clear them out before starting a sync.
  Connection -> Transaction () -> IO ()
forall (m :: * -> *) a.
(MonadIO m, HasCallStack) =>
Connection -> Transaction a -> m a
Sqlite.runTransaction Connection
srcConn Transaction ()
Q.clearTempEntityTables
  IO (Either SyncErr ()) -> IO (Either SyncErr ())
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either SyncErr ()) -> IO (Either SyncErr ()))
-> (ExceptT SyncErr (ResourceT IO) () -> IO (Either SyncErr ()))
-> ExceptT SyncErr (ResourceT IO) ()
-> IO (Either SyncErr ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ResourceT IO (Either SyncErr ()) -> IO (Either SyncErr ())
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
C.runResourceT (ResourceT IO (Either SyncErr ()) -> IO (Either SyncErr ()))
-> (ExceptT SyncErr (ResourceT IO) ()
    -> ResourceT IO (Either SyncErr ()))
-> ExceptT SyncErr (ResourceT IO) ()
-> IO (Either SyncErr ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExceptT SyncErr (ResourceT IO) ()
-> ResourceT IO (Either SyncErr ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT SyncErr (ResourceT IO) () -> IO (Either SyncErr ()))
-> ExceptT SyncErr (ResourceT IO) () -> IO (Either SyncErr ())
forall a b. (a -> b) -> a -> b
$ Connection
-> CausalHash
-> Maybe BranchRef
-> (Int
    -> Stream () DownloadEntitiesChunk
    -> ExceptT SyncErr (ResourceT IO) ())
-> ExceptT SyncErr (ResourceT IO) ()
forall (m :: * -> *) r.
MonadIO m =>
Connection
-> CausalHash
-> Maybe BranchRef
-> (Int -> Stream () DownloadEntitiesChunk -> m r)
-> m r
withCodebaseEntityStream Connection
srcConn CausalHash
causalHash Maybe BranchRef
forall a. Maybe a
Nothing \Int
_total Stream () DownloadEntitiesChunk
entityStream -> do
    (StreamInitInfo
header, Stream () EntityChunk
rest) <- Stream () DownloadEntitiesChunk
-> StreamM (StreamInitInfo, Stream () EntityChunk)
initializeStream Stream () DownloadEntitiesChunk
entityStream
    Bool
-> Codebase IO v a
-> StreamInitInfo
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
forall v a.
Bool
-> Codebase IO v a
-> StreamInitInfo
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
streamIntoCodebase Bool
shouldValidate Codebase IO v a
destCodebase StreamInitInfo
header Stream () EntityChunk
rest
    (IO (Either SyncErr ()) -> ResourceT IO (Either SyncErr ()))
-> ExceptT SyncErr IO () -> ExceptT SyncErr (ResourceT IO) ()
forall (m :: * -> *) e a (n :: * -> *) e' b.
(m (Either e a) -> n (Either e' b))
-> ExceptT e m a -> ExceptT e' n b
mapExceptT IO (Either SyncErr ()) -> ResourceT IO (Either SyncErr ())
forall a. IO a -> ResourceT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Codebase IO v a -> Hash32 -> ExceptT SyncErr IO ()
forall v a. Codebase IO v a -> Hash32 -> ExceptT SyncErr IO ()
afterSyncChecks Codebase IO v a
destCodebase (CausalHash -> Hash32
causalHashToHash32 CausalHash
causalHash))

syncFromCodeserver ::
  Bool ->
  -- | The Unison Share URL.
  Servant.BaseUrl ->
  -- | The branch to download from.
  SyncV2.BranchRef ->
  -- | The hash to download.
  Share.HashJWT ->
  -- | Callback that's given a number of entities we just downloaded.
  (Int -> IO ()) ->
  Cli (Either (SyncError SyncV2.PullError) ())
syncFromCodeserver :: Bool
-> BaseUrl
-> BranchRef
-> HashJWT
-> (Int -> IO ())
-> Cli (Either SyncErr ())
syncFromCodeserver Bool
shouldValidate BaseUrl
unisonShareUrl BranchRef
branchRef HashJWT
hashJwt Int -> IO ()
_downloadedCallback = do
  Cli.Env {AuthenticatedHttpClient
authHTTPClient :: AuthenticatedHttpClient
$sel:authHTTPClient:Env :: Env -> AuthenticatedHttpClient
authHTTPClient, Codebase IO Symbol Ann
$sel:codebase:Env :: Env -> Codebase IO Symbol Ann
codebase :: Codebase IO Symbol Ann
codebase} <- Cli Env
forall r (m :: * -> *). MonadReader r m => m r
ask
  -- Every insert into SQLite checks the temp entity tables, but syncv2 doesn't actually use them, so it's faster
  -- if we clear them out before starting a sync.
  Transaction () -> Cli ()
forall a. Transaction a -> Cli a
Cli.runTransaction Transaction ()
Q.clearTempEntityTables
  ExceptT SyncErr Cli () -> Cli (Either SyncErr ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT do
    Set Hash32
knownHashes <- Cli (Either SyncErr (Set Hash32))
-> ExceptT SyncErr Cli (Set Hash32)
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (Cli (Either SyncErr (Set Hash32))
 -> ExceptT SyncErr Cli (Set Hash32))
-> Cli (Either SyncErr (Set Hash32))
-> ExceptT SyncErr Cli (Set Hash32)
forall a b. (a -> b) -> a -> b
$ BaseUrl
-> BranchRef -> HashJWT -> Cli (Either SyncErr (Set Hash32))
negotiateKnownCausals BaseUrl
unisonShareUrl BranchRef
branchRef HashJWT
hashJwt
    let hash :: Hash32
hash = HashJWT -> Hash32
Share.hashJWTHash HashJWT
hashJwt
    Cli (Either SyncErr ()) -> ExceptT SyncErr Cli ()
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (Cli (Either SyncErr ()) -> ExceptT SyncErr Cli ())
-> Cli (Either SyncErr ()) -> ExceptT SyncErr Cli ()
forall a b. (a -> b) -> a -> b
$ do
      (Transaction (Maybe EntityLocation) -> Cli (Maybe EntityLocation)
forall a. Transaction a -> Cli a
Cli.runTransaction (Hash32 -> Transaction (Maybe EntityLocation)
Q.entityLocation Hash32
hash)) Cli (Maybe EntityLocation)
-> (Maybe EntityLocation -> Cli (Either SyncErr ()))
-> Cli (Either SyncErr ())
forall a b. Cli a -> (a -> Cli b) -> Cli b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Just EntityLocation
Q.EntityInMainStorage -> Either SyncErr () -> Cli (Either SyncErr ())
forall a. a -> Cli a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either SyncErr () -> Cli (Either SyncErr ()))
-> Either SyncErr () -> Cli (Either SyncErr ())
forall a b. (a -> b) -> a -> b
$ () -> Either SyncErr ()
forall a b. b -> Either a b
Right ()
        Maybe EntityLocation
_ -> do
          FilePath -> Cli (Either SyncErr ()) -> Cli (Either SyncErr ())
forall (m :: * -> *) a. MonadIO m => FilePath -> m a -> m a
Timing.time FilePath
"Entity Download" (Cli (Either SyncErr ()) -> Cli (Either SyncErr ()))
-> Cli (Either SyncErr ()) -> Cli (Either SyncErr ())
forall a b. (a -> b) -> a -> b
$ do
            IO (Either SyncErr ()) -> Cli (Either SyncErr ())
forall a. IO a -> Cli a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either SyncErr ()) -> Cli (Either SyncErr ()))
-> (ExceptT SyncErr (ResourceT IO) () -> IO (Either SyncErr ()))
-> ExceptT SyncErr (ResourceT IO) ()
-> Cli (Either SyncErr ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ResourceT IO (Either SyncErr ()) -> IO (Either SyncErr ())
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
C.runResourceT (ResourceT IO (Either SyncErr ()) -> IO (Either SyncErr ()))
-> (ExceptT SyncErr (ResourceT IO) ()
    -> ResourceT IO (Either SyncErr ()))
-> ExceptT SyncErr (ResourceT IO) ()
-> IO (Either SyncErr ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExceptT SyncErr (ResourceT IO) ()
-> ResourceT IO (Either SyncErr ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT SyncErr (ResourceT IO) () -> Cli (Either SyncErr ()))
-> ExceptT SyncErr (ResourceT IO) () -> Cli (Either SyncErr ())
forall a b. (a -> b) -> a -> b
$ AuthenticatedHttpClient
-> BaseUrl
-> DownloadEntitiesRequest
-> (StreamInitInfo
    -> Stream () EntityChunk -> ExceptT SyncErr (ResourceT IO) ())
-> ExceptT SyncErr (ResourceT IO) ()
httpStreamEntities
              AuthenticatedHttpClient
authHTTPClient
              BaseUrl
unisonShareUrl
              SyncV2.DownloadEntitiesRequest {BranchRef
branchRef :: BranchRef
$sel:branchRef:DownloadEntitiesRequest :: BranchRef
branchRef, $sel:causalHash:DownloadEntitiesRequest :: HashJWT
causalHash = HashJWT
hashJwt, Set Hash32
knownHashes :: Set Hash32
$sel:knownHashes:DownloadEntitiesRequest :: Set Hash32
knownHashes}
              \StreamInitInfo
header Stream () EntityChunk
stream -> do
                Bool
-> Codebase IO Symbol Ann
-> StreamInitInfo
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
forall v a.
Bool
-> Codebase IO v a
-> StreamInitInfo
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
streamIntoCodebase Bool
shouldValidate Codebase IO Symbol Ann
codebase StreamInitInfo
header Stream () EntityChunk
stream
    (IO (Either SyncErr ()) -> Cli (Either SyncErr ()))
-> ExceptT SyncErr IO () -> ExceptT SyncErr Cli ()
forall (m :: * -> *) e a (n :: * -> *) e' b.
(m (Either e a) -> n (Either e' b))
-> ExceptT e m a -> ExceptT e' n b
mapExceptT IO (Either SyncErr ()) -> Cli (Either SyncErr ())
forall a. IO a -> Cli a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Codebase IO Symbol Ann -> Hash32 -> ExceptT SyncErr IO ()
forall v a. Codebase IO v a -> Hash32 -> ExceptT SyncErr IO ()
afterSyncChecks Codebase IO Symbol Ann
codebase Hash32
hash)

------------------------------------------------------------------------------------------------------------------------
-- Helpers
------------------------------------------------------------------------------------------------------------------------

-- | Validate that the provided entities match their expected hashes, and if so, save them to the codebase.
validateAndSave :: Bool -> (Codebase.Codebase IO v a) -> Vector (Hash32, TempEntity) -> StreamM ()
validateAndSave :: forall v a.
Bool
-> Codebase IO v a
-> Vector (Hash32, TempEntity)
-> ExceptT SyncErr (ResourceT IO) ()
validateAndSave Bool
shouldValidate Codebase IO v a
codebase Vector (Hash32, TempEntity)
entities = do
  let validateEntities :: IO (Either SyncErr ())
validateEntities =
        ExceptT SyncErr IO () -> IO (Either SyncErr ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT SyncErr IO () -> IO (Either SyncErr ()))
-> ExceptT SyncErr IO () -> IO (Either SyncErr ())
forall a b. (a -> b) -> a -> b
$ Bool -> ExceptT SyncErr IO () -> ExceptT SyncErr IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
shouldValidate (Vector (Hash32, TempEntity) -> ExceptT SyncErr IO ()
batchValidateEntities Vector (Hash32, TempEntity)
entities)
  -- Validation is slow, so we run it in parallel with insertion (which can also be slow),
  -- but we don't commit the transaction until we're done validation to avoid inserting invalid entities.
  ResourceT IO (Either SyncErr ())
-> ExceptT SyncErr (ResourceT IO) ()
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (ResourceT IO (Either SyncErr ())
 -> ExceptT SyncErr (ResourceT IO) ())
-> (IO (Either SyncErr ()) -> ResourceT IO (Either SyncErr ()))
-> IO (Either SyncErr ())
-> ExceptT SyncErr (ResourceT IO) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Either SyncErr ()) -> ResourceT IO (Either SyncErr ())
forall a. IO a -> ResourceT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either SyncErr ()) -> ExceptT SyncErr (ResourceT IO) ())
-> IO (Either SyncErr ()) -> ExceptT SyncErr (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ IO (Either SyncErr ())
-> (Async (Either SyncErr ()) -> IO (Either SyncErr ()))
-> IO (Either SyncErr ())
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
IO.withAsync IO (Either SyncErr ())
validateEntities \Async (Either SyncErr ())
validationTask -> do
    FilePath -> IO (Either SyncErr ()) -> IO (Either SyncErr ())
forall (m :: * -> *) a. MonadIO m => FilePath -> m a -> m a
Timing.time FilePath
"Inserting entities" (IO (Either SyncErr ()) -> IO (Either SyncErr ()))
-> IO (Either SyncErr ()) -> IO (Either SyncErr ())
forall a b. (a -> b) -> a -> b
$ Codebase IO v a
-> ExceptT SyncErr Transaction () -> IO (Either SyncErr ())
forall (m :: * -> *) v a e b.
MonadIO m =>
Codebase m v a -> ExceptT e Transaction b -> m (Either e b)
Codebase.runTransactionExceptT Codebase IO v a
codebase do
      Vector (Hash32, TempEntity)
-> ((Hash32, TempEntity) -> ExceptT SyncErr Transaction ())
-> ExceptT SyncErr Transaction ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Vector (Hash32, TempEntity)
entities \(Hash32
hash, TempEntity
entity) -> do
        ExceptT SyncErr Transaction (Either CausalHashId ObjectId)
-> ExceptT SyncErr Transaction ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ExceptT SyncErr Transaction (Either CausalHashId ObjectId)
 -> ExceptT SyncErr Transaction ())
-> (Transaction (Either CausalHashId ObjectId)
    -> ExceptT SyncErr Transaction (Either CausalHashId ObjectId))
-> Transaction (Either CausalHashId ObjectId)
-> ExceptT SyncErr Transaction ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Transaction (Either CausalHashId ObjectId)
-> ExceptT SyncErr Transaction (Either CausalHashId ObjectId)
forall (m :: * -> *) a. Monad m => m a -> ExceptT SyncErr m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Transaction (Either CausalHashId ObjectId)
 -> ExceptT SyncErr Transaction ())
-> Transaction (Either CausalHashId ObjectId)
-> ExceptT SyncErr Transaction ()
forall a b. (a -> b) -> a -> b
$ HashHandle
-> Hash32
-> TempEntity
-> Transaction (Either CausalHashId ObjectId)
Q.saveTempEntityInMain HashHandle
v2HashHandle Hash32
hash TempEntity
entity
      Transaction (Either SyncErr ())
-> ExceptT SyncErr Transaction (Either SyncErr ())
forall (m :: * -> *) a. Monad m => m a -> ExceptT SyncErr m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (IO (Either SyncErr ()) -> Transaction (Either SyncErr ())
forall a. HasCallStack => IO a -> Transaction a
Sqlite.unsafeIO (Async (Either SyncErr ()) -> IO (Either SyncErr ())
forall (m :: * -> *) a. MonadIO m => Async a -> m a
IO.wait Async (Either SyncErr ())
validationTask)) ExceptT SyncErr Transaction (Either SyncErr ())
-> (Either SyncErr () -> ExceptT SyncErr Transaction ())
-> ExceptT SyncErr Transaction ()
forall a b.
ExceptT SyncErr Transaction a
-> (a -> ExceptT SyncErr Transaction b)
-> ExceptT SyncErr Transaction b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Left SyncErr
err -> SyncErr -> ExceptT SyncErr Transaction ()
forall a. SyncErr -> ExceptT SyncErr Transaction a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError SyncErr
err
        Right ()
_ -> () -> ExceptT SyncErr Transaction ()
forall a. a -> ExceptT SyncErr Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

-- | Validate that a batch of entities matches the hashes they're keyed by, throwing an error if any of them fail validation.
batchValidateEntities :: Vector (Hash32, TempEntity) -> ExceptT SyncErr IO ()
batchValidateEntities :: Vector (Hash32, TempEntity) -> ExceptT SyncErr IO ()
batchValidateEntities Vector (Hash32, TempEntity)
entities = do
  Vector EntityValidationError
mismatches <- (Vector (Maybe EntityValidationError)
 -> Vector EntityValidationError)
-> ExceptT SyncErr IO (Vector (Maybe EntityValidationError))
-> ExceptT SyncErr IO (Vector EntityValidationError)
forall a b.
(a -> b) -> ExceptT SyncErr IO a -> ExceptT SyncErr IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Vector (Maybe EntityValidationError)
-> Vector EntityValidationError
forall a. Vector (Maybe a) -> Vector a
Vector.catMaybes (ExceptT SyncErr IO (Vector (Maybe EntityValidationError))
 -> ExceptT SyncErr IO (Vector EntityValidationError))
-> ExceptT SyncErr IO (Vector (Maybe EntityValidationError))
-> ExceptT SyncErr IO (Vector EntityValidationError)
forall a b. (a -> b) -> a -> b
$ IO (Vector (Maybe EntityValidationError))
-> ExceptT SyncErr IO (Vector (Maybe EntityValidationError))
forall a. IO a -> ExceptT SyncErr IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Vector (Maybe EntityValidationError))
 -> ExceptT SyncErr IO (Vector (Maybe EntityValidationError)))
-> IO (Vector (Maybe EntityValidationError))
-> ExceptT SyncErr IO (Vector (Maybe EntityValidationError))
forall a b. (a -> b) -> a -> b
$ Vector (Hash32, TempEntity)
-> ((Hash32, TempEntity) -> IO (Maybe EntityValidationError))
-> IO (Vector (Maybe EntityValidationError))
forall (m :: * -> *) (t :: * -> *) a b.
(MonadUnliftIO m, Traversable t) =>
t a -> (a -> m b) -> m (t b)
IO.pooledForConcurrently Vector (Hash32, TempEntity)
entities \(Hash32
hash, TempEntity
entity) -> do
    Maybe EntityValidationError -> IO (Maybe EntityValidationError)
forall (m :: * -> *) a. MonadIO m => a -> m a
IO.evaluate (Maybe EntityValidationError -> IO (Maybe EntityValidationError))
-> Maybe EntityValidationError -> IO (Maybe EntityValidationError)
forall a b. (a -> b) -> a -> b
$ Hash32 -> TempEntity -> Maybe EntityValidationError
EV.validateTempEntity Hash32
hash TempEntity
entity
  Vector EntityValidationError
-> (EntityValidationError -> ExceptT SyncErr IO ())
-> ExceptT SyncErr IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Vector EntityValidationError
mismatches \case
    err :: EntityValidationError
err@(Share.EntityHashMismatch EntityType
et (Share.HashMismatchForEntity {Hash32
supplied :: Hash32
$sel:supplied:HashMismatchForEntity :: HashMismatchForEntity -> Hash32
supplied, Hash32
computed :: Hash32
$sel:computed:HashMismatchForEntity :: HashMismatchForEntity -> Hash32
computed})) ->
      let expectedMismatches :: Map Hash32 Hash32
expectedMismatches = case EntityType
et of
            EntityType
Share.TermComponentType -> Map Hash32 Hash32
expectedComponentHashMismatches
            EntityType
Share.DeclComponentType -> Map Hash32 Hash32
expectedComponentHashMismatches
            EntityType
Share.CausalType -> Map Hash32 Hash32
expectedCausalHashMismatches
            EntityType
_ -> Map Hash32 Hash32
forall a. Monoid a => a
mempty
       in case Hash32 -> Map Hash32 Hash32 -> Maybe Hash32
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Hash32
supplied Map Hash32 Hash32
expectedMismatches of
            Just Hash32
expected
              | Hash32
expected Hash32 -> Hash32 -> Bool
forall a. Eq a => a -> a -> Bool
== Hash32
computed -> () -> ExceptT SyncErr IO ()
forall a. a -> ExceptT SyncErr IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
            Maybe Hash32
_ -> do
              SyncErr -> ExceptT SyncErr IO ()
forall a. SyncErr -> ExceptT SyncErr IO a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> ExceptT SyncErr IO ())
-> (EntityValidationError -> SyncErr)
-> EntityValidationError
-> ExceptT SyncErr IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (EntityValidationError -> PullError)
-> EntityValidationError
-> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DownloadEntitiesError -> PullError
SyncV2.PullError'DownloadEntities (DownloadEntitiesError -> PullError)
-> (EntityValidationError -> DownloadEntitiesError)
-> EntityValidationError
-> PullError
forall b c a. (b -> c) -> (a -> b) -> a -> c
. EntityValidationError -> DownloadEntitiesError
SyncV2.DownloadEntitiesEntityValidationFailure (EntityValidationError -> ExceptT SyncErr IO ())
-> EntityValidationError -> ExceptT SyncErr IO ()
forall a b. (a -> b) -> a -> b
$ EntityValidationError
err
    EntityValidationError
err -> do
      SyncErr -> ExceptT SyncErr IO ()
forall a. SyncErr -> ExceptT SyncErr IO a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> ExceptT SyncErr IO ())
-> (EntityValidationError -> SyncErr)
-> EntityValidationError
-> ExceptT SyncErr IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (EntityValidationError -> PullError)
-> EntityValidationError
-> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DownloadEntitiesError -> PullError
SyncV2.PullError'DownloadEntities (DownloadEntitiesError -> PullError)
-> (EntityValidationError -> DownloadEntitiesError)
-> EntityValidationError
-> PullError
forall b c a. (b -> c) -> (a -> b) -> a -> c
. EntityValidationError -> DownloadEntitiesError
SyncV2.DownloadEntitiesEntityValidationFailure (EntityValidationError -> ExceptT SyncErr IO ())
-> EntityValidationError -> ExceptT SyncErr IO ()
forall a b. (a -> b) -> a -> b
$ EntityValidationError
err

-- | Syncs a stream which could send entities in any order.
syncUnsortedStream ::
  Bool ->
  (Codebase.Codebase IO v a) ->
  Stream () SyncV2.EntityChunk ->
  StreamM ()
syncUnsortedStream :: forall v a.
Bool
-> Codebase IO v a
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
syncUnsortedStream Bool
shouldValidate Codebase IO v a
codebase Stream () EntityChunk
stream = do
  Vector (Hash32, TempEntity)
allEntities <-
    ConduitT
  ()
  Void
  (ExceptT SyncErr (ResourceT IO))
  (Vector (Hash32, TempEntity))
-> ExceptT SyncErr (ResourceT IO) (Vector (Hash32, TempEntity))
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
C.runConduit (ConduitT
   ()
   Void
   (ExceptT SyncErr (ResourceT IO))
   (Vector (Hash32, TempEntity))
 -> ExceptT SyncErr (ResourceT IO) (Vector (Hash32, TempEntity)))
-> ConduitT
     ()
     Void
     (ExceptT SyncErr (ResourceT IO))
     (Vector (Hash32, TempEntity))
-> ExceptT SyncErr (ResourceT IO) (Vector (Hash32, TempEntity))
forall a b. (a -> b) -> a -> b
$
      Stream () EntityChunk
stream
        Stream () EntityChunk
-> ConduitT
     EntityChunk
     Void
     (ExceptT SyncErr (ResourceT IO))
     (Vector (Hash32, TempEntity))
-> ConduitT
     ()
     Void
     (ExceptT SyncErr (ResourceT IO))
     (Vector (Hash32, TempEntity))
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| Int
-> ConduitT
     EntityChunk [EntityChunk] (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a. Monad m => Int -> ConduitT a [a] m ()
CL.chunksOf Int
batchSize
        ConduitT
  EntityChunk [EntityChunk] (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT
     [EntityChunk]
     Void
     (ExceptT SyncErr (ResourceT IO))
     (Vector (Hash32, TempEntity))
-> ConduitT
     EntityChunk
     Void
     (ExceptT SyncErr (ResourceT IO))
     (Vector (Hash32, TempEntity))
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| Codebase IO v a
-> Stream [EntityChunk] (Vector (Hash32, TempEntity))
forall v a.
Codebase IO v a
-> Stream [EntityChunk] (Vector (Hash32, TempEntity))
unpackChunks Codebase IO v a
codebase
        Stream [EntityChunk] (Vector (Hash32, TempEntity))
-> ConduitT
     (Vector (Hash32, TempEntity))
     Void
     (ExceptT SyncErr (ResourceT IO))
     (Vector (Hash32, TempEntity))
-> ConduitT
     [EntityChunk]
     Void
     (ExceptT SyncErr (ResourceT IO))
     (Vector (Hash32, TempEntity))
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| Stream (Vector (Hash32, TempEntity)) (Vector (Hash32, TempEntity))
validateBatch
        Stream (Vector (Hash32, TempEntity)) (Vector (Hash32, TempEntity))
-> ConduitT
     (Vector (Hash32, TempEntity))
     Void
     (ExceptT SyncErr (ResourceT IO))
     (Vector (Hash32, TempEntity))
-> ConduitT
     (Vector (Hash32, TempEntity))
     Void
     (ExceptT SyncErr (ResourceT IO))
     (Vector (Hash32, TempEntity))
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| ConduitT
  (Vector (Hash32, TempEntity))
  (Hash32, TempEntity)
  (ExceptT SyncErr (ResourceT IO))
  ()
ConduitT
  (Vector (Hash32, TempEntity))
  (Element (Vector (Hash32, TempEntity)))
  (ExceptT SyncErr (ResourceT IO))
  ()
forall (m :: * -> *) mono.
(Monad m, MonoFoldable mono) =>
ConduitT mono (Element mono) m ()
C.concat
        ConduitT
  (Vector (Hash32, TempEntity))
  (Hash32, TempEntity)
  (ExceptT SyncErr (ResourceT IO))
  ()
-> ConduitT
     (Hash32, TempEntity)
     Void
     (ExceptT SyncErr (ResourceT IO))
     (Vector (Hash32, TempEntity))
-> ConduitT
     (Vector (Hash32, TempEntity))
     Void
     (ExceptT SyncErr (ResourceT IO))
     (Vector (Hash32, TempEntity))
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| forall (v :: * -> *) a (m :: * -> *) o.
(Vector v a, PrimMonad m) =>
ConduitT a o m (v a)
C.sinkVector @Vector
  let sortedEntities :: [(Hash32, TempEntity)]
sortedEntities = Vector (Hash32, TempEntity) -> [(Hash32, TempEntity)]
forall (f :: * -> *).
(Foldable f, Functor f) =>
f (Hash32, TempEntity) -> [(Hash32, TempEntity)]
sortDependencyFirst Vector (Hash32, TempEntity)
allEntities
  IO () -> ExceptT SyncErr (ResourceT IO) ()
forall a. IO a -> ExceptT SyncErr (ResourceT IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT SyncErr (ResourceT IO) ())
-> IO () -> ExceptT SyncErr (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ Maybe Int -> ((Int -> IO ()) -> IO ()) -> IO ()
forall (m :: * -> *) a.
MonadUnliftIO m =>
Maybe Int -> ((Int -> m ()) -> m a) -> m a
withEntitySavingCallback (Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$ Vector (Hash32, TempEntity) -> Int
forall a. Vector a -> Int
Vector.length Vector (Hash32, TempEntity)
allEntities) \Int -> IO ()
countC -> do
    Codebase IO v a -> Transaction () -> IO ()
forall (m :: * -> *) v a b.
MonadIO m =>
Codebase m v a -> Transaction b -> m b
Codebase.runTransaction Codebase IO v a
codebase (Transaction () -> IO ()) -> Transaction () -> IO ()
forall a b. (a -> b) -> a -> b
$ [(Hash32, TempEntity)]
-> ((Hash32, TempEntity)
    -> Transaction (Either CausalHashId ObjectId))
-> Transaction ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [(Hash32, TempEntity)]
sortedEntities \(Hash32
hash, TempEntity
entity) -> do
      Either CausalHashId ObjectId
r <- HashHandle
-> Hash32
-> TempEntity
-> Transaction (Either CausalHashId ObjectId)
Q.saveTempEntityInMain HashHandle
v2HashHandle Hash32
hash TempEntity
entity
      IO () -> Transaction ()
forall a. HasCallStack => IO a -> Transaction a
Sqlite.unsafeIO (IO () -> Transaction ()) -> IO () -> Transaction ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
countC Int
1
      pure Either CausalHashId ObjectId
r
  where
    validateBatch :: Stream (Vector (Hash32, TempEntity)) (Vector (Hash32, TempEntity))
    validateBatch :: Stream (Vector (Hash32, TempEntity)) (Vector (Hash32, TempEntity))
validateBatch = (Vector (Hash32, TempEntity) -> ExceptT SyncErr (ResourceT IO) ())
-> Stream
     (Vector (Hash32, TempEntity)) (Vector (Hash32, TempEntity))
forall (m :: * -> *) a. Monad m => (a -> m ()) -> ConduitT a a m ()
C.iterM \Vector (Hash32, TempEntity)
entities -> do
      Bool
-> ExceptT SyncErr (ResourceT IO) ()
-> ExceptT SyncErr (ResourceT IO) ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
shouldValidate ((IO (Either SyncErr ()) -> ResourceT IO (Either SyncErr ()))
-> ExceptT SyncErr IO () -> ExceptT SyncErr (ResourceT IO) ()
forall (m :: * -> *) e a (n :: * -> *) e' b.
(m (Either e a) -> n (Either e' b))
-> ExceptT e m a -> ExceptT e' n b
mapExceptT IO (Either SyncErr ()) -> ResourceT IO (Either SyncErr ())
forall (m :: * -> *) a. Monad m => m a -> ResourceT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ExceptT SyncErr IO () -> ExceptT SyncErr (ResourceT IO) ())
-> ExceptT SyncErr IO () -> ExceptT SyncErr (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ Vector (Hash32, TempEntity) -> ExceptT SyncErr IO ()
batchValidateEntities Vector (Hash32, TempEntity)
entities)

-- | Syncs a stream which sends entities which are already sorted in dependency order.
-- This allows us to stream them directly into the codebase as they're received.
syncSortedStream ::
  Bool ->
  (Codebase.Codebase IO v a) ->
  Stream () SyncV2.EntityChunk ->
  StreamM ()
syncSortedStream :: forall v a.
Bool
-> Codebase IO v a
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
syncSortedStream Bool
shouldValidate Codebase IO v a
codebase Stream () EntityChunk
stream = do
  let handler :: Stream (Vector (Hash32, TempEntity)) o
      handler :: forall o. Stream (Vector (Hash32, TempEntity)) o
handler = (Vector (Hash32, TempEntity) -> ExceptT SyncErr (ResourceT IO) ())
-> ConduitT
     (Vector (Hash32, TempEntity)) o (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a o.
Monad m =>
(a -> m ()) -> ConduitT a o m ()
C.mapM_C \Vector (Hash32, TempEntity)
entityBatch -> do
        Bool
-> Codebase IO v a
-> Vector (Hash32, TempEntity)
-> ExceptT SyncErr (ResourceT IO) ()
forall v a.
Bool
-> Codebase IO v a
-> Vector (Hash32, TempEntity)
-> ExceptT SyncErr (ResourceT IO) ()
validateAndSave Bool
shouldValidate Codebase IO v a
codebase Vector (Hash32, TempEntity)
entityBatch
  ConduitT () Void (ExceptT SyncErr (ResourceT IO)) ()
-> ExceptT SyncErr (ResourceT IO) ()
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
C.runConduit (ConduitT () Void (ExceptT SyncErr (ResourceT IO)) ()
 -> ExceptT SyncErr (ResourceT IO) ())
-> ConduitT () Void (ExceptT SyncErr (ResourceT IO)) ()
-> ExceptT SyncErr (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$
    Stream () EntityChunk
stream
      Stream () EntityChunk
-> ConduitT EntityChunk Void (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT () Void (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| Int
-> ConduitT
     EntityChunk [EntityChunk] (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a. Monad m => Int -> ConduitT a [a] m ()
CL.chunksOf Int
batchSize
      ConduitT
  EntityChunk [EntityChunk] (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT [EntityChunk] Void (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT EntityChunk Void (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| Codebase IO v a
-> Stream [EntityChunk] (Vector (Hash32, TempEntity))
forall v a.
Codebase IO v a
-> Stream [EntityChunk] (Vector (Hash32, TempEntity))
unpackChunks Codebase IO v a
codebase
      Stream [EntityChunk] (Vector (Hash32, TempEntity))
-> ConduitT
     (Vector (Hash32, TempEntity))
     Void
     (ExceptT SyncErr (ResourceT IO))
     ()
-> ConduitT [EntityChunk] Void (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| ConduitT
  (Vector (Hash32, TempEntity))
  Void
  (ExceptT SyncErr (ResourceT IO))
  ()
forall o. Stream (Vector (Hash32, TempEntity)) o
handler

-- | Topologically sort entities based on their dependencies, returning a list in dependency-first order.
sortDependencyFirst :: (Foldable f, Functor f) => f (Hash32, TempEntity) -> [(Hash32, TempEntity)]
sortDependencyFirst :: forall (f :: * -> *).
(Foldable f, Functor f) =>
f (Hash32, TempEntity) -> [(Hash32, TempEntity)]
sortDependencyFirst f (Hash32, TempEntity)
entities = do
  let adjList :: f ((Hash32, TempEntity), Hash32, [Hash32])
adjList = f (Hash32, TempEntity)
entities f (Hash32, TempEntity)
-> ((Hash32, TempEntity)
    -> ((Hash32, TempEntity), Hash32, [Hash32]))
-> f ((Hash32, TempEntity), Hash32, [Hash32])
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \(Hash32
hash32, TempEntity
entity) -> ((Hash32
hash32, TempEntity
entity), Hash32
hash32, Set Hash32 -> [Hash32]
forall a. Set a -> [a]
Set.toList (Set Hash32 -> [Hash32]) -> Set Hash32 -> [Hash32]
forall a b. (a -> b) -> a -> b
$ Entity Text Hash32 Hash32 -> Set Hash32
forall hash text noSyncHash.
Ord hash =>
Entity text noSyncHash hash -> Set hash
Share.entityDependencies (TempEntity -> Entity Text Hash32 Hash32
tempEntityToEntity TempEntity
entity))
      (Graph
graph, Int -> ((Hash32, TempEntity), Hash32, [Hash32])
vertexInfo, Hash32 -> Maybe Int
_vertexForKey) = [((Hash32, TempEntity), Hash32, [Hash32])]
-> (Graph, Int -> ((Hash32, TempEntity), Hash32, [Hash32]),
    Hash32 -> Maybe Int)
forall key node.
Ord key =>
[(node, key, [key])]
-> (Graph, Int -> (node, key, [key]), key -> Maybe Int)
Graph.graphFromEdges (f ((Hash32, TempEntity), Hash32, [Hash32])
-> [((Hash32, TempEntity), Hash32, [Hash32])]
forall a. f a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
Foldable.toList f ((Hash32, TempEntity), Hash32, [Hash32])
adjList)
   in Graph -> [Int]
Graph.reverseTopSort Graph
graph [Int] -> (Int -> (Hash32, TempEntity)) -> [(Hash32, TempEntity)]
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \Int
v -> (Getting
  (Hash32, TempEntity)
  ((Hash32, TempEntity), Hash32, [Hash32])
  (Hash32, TempEntity)
-> ((Hash32, TempEntity), Hash32, [Hash32]) -> (Hash32, TempEntity)
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting
  (Hash32, TempEntity)
  ((Hash32, TempEntity), Hash32, [Hash32])
  (Hash32, TempEntity)
forall s t a b. Field1 s t a b => Lens s t a b
Lens
  ((Hash32, TempEntity), Hash32, [Hash32])
  ((Hash32, TempEntity), Hash32, [Hash32])
  (Hash32, TempEntity)
  (Hash32, TempEntity)
_1 (((Hash32, TempEntity), Hash32, [Hash32]) -> (Hash32, TempEntity))
-> ((Hash32, TempEntity), Hash32, [Hash32]) -> (Hash32, TempEntity)
forall a b. (a -> b) -> a -> b
$ Int -> ((Hash32, TempEntity), Hash32, [Hash32])
vertexInfo Int
v)

-- | Unpack a single entity chunk, returning the entity if it's not already in the codebase, Nothing otherwise.
unpackChunk :: SyncV2.EntityChunk -> ExceptT SyncErr Sqlite.Transaction (Maybe (Hash32, TempEntity))
unpackChunk :: EntityChunk
-> ExceptT SyncErr Transaction (Maybe (Hash32, TempEntity))
unpackChunk = \case
  SyncV2.EntityChunk {Hash32
hash :: Hash32
$sel:hash:EntityChunk :: EntityChunk -> Hash32
hash, $sel:entityCBOR:EntityChunk :: EntityChunk -> CBORBytes TempEntity
entityCBOR = CBORBytes TempEntity
entityBytes} -> do
    -- Only want entities we don't already have
    Transaction (Maybe EntityLocation)
-> ExceptT SyncErr Transaction (Maybe EntityLocation)
forall (m :: * -> *) a. Monad m => m a -> ExceptT SyncErr m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Hash32 -> Transaction (Maybe EntityLocation)
Q.entityLocation Hash32
hash) ExceptT SyncErr Transaction (Maybe EntityLocation)
-> (Maybe EntityLocation
    -> ExceptT SyncErr Transaction (Maybe (Hash32, TempEntity)))
-> ExceptT SyncErr Transaction (Maybe (Hash32, TempEntity))
forall a b.
ExceptT SyncErr Transaction a
-> (a -> ExceptT SyncErr Transaction b)
-> ExceptT SyncErr Transaction b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Just EntityLocation
Q.EntityInMainStorage -> Maybe (Hash32, TempEntity)
-> ExceptT SyncErr Transaction (Maybe (Hash32, TempEntity))
forall a. a -> ExceptT SyncErr Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Hash32, TempEntity)
forall a. Maybe a
Nothing
      Maybe EntityLocation
_ -> do
        ((Hash32, TempEntity) -> Maybe (Hash32, TempEntity)
forall a. a -> Maybe a
Just ((Hash32, TempEntity) -> Maybe (Hash32, TempEntity))
-> (TempEntity -> (Hash32, TempEntity))
-> TempEntity
-> Maybe (Hash32, TempEntity)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Hash32
hash,)) (TempEntity -> Maybe (Hash32, TempEntity))
-> ExceptT SyncErr Transaction TempEntity
-> ExceptT SyncErr Transaction (Maybe (Hash32, TempEntity))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> CBORBytes TempEntity -> ExceptT SyncErr Transaction TempEntity
unpackEntity CBORBytes TempEntity
entityBytes
    where
      unpackEntity :: (CBORBytes TempEntity) -> ExceptT SyncErr Sqlite.Transaction TempEntity
      unpackEntity :: CBORBytes TempEntity -> ExceptT SyncErr Transaction TempEntity
unpackEntity CBORBytes TempEntity
entityBytes = do
        case CBORBytes TempEntity -> Either DeserialiseFailure TempEntity
forall t. Serialise t => CBORBytes t -> Either DeserialiseFailure t
CBOR.deserialiseOrFailCBORBytes CBORBytes TempEntity
entityBytes of
          Left DeserialiseFailure
err -> do SyncErr -> ExceptT SyncErr Transaction TempEntity
forall a. SyncErr -> ExceptT SyncErr Transaction a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> ExceptT SyncErr Transaction TempEntity)
-> SyncErr -> ExceptT SyncErr Transaction TempEntity
forall a b. (a -> b) -> a -> b
$ (PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (SyncError -> PullError) -> SyncError -> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError -> PullError
SyncV2.PullError'Sync (SyncError -> SyncErr) -> SyncError -> SyncErr
forall a b. (a -> b) -> a -> b
$ DeserialiseFailure -> SyncError
SyncV2.SyncErrorDeserializationFailure DeserialiseFailure
err)
          Right TempEntity
entity -> TempEntity -> ExceptT SyncErr Transaction TempEntity
forall a. a -> ExceptT SyncErr Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TempEntity
entity

unpackChunks :: Codebase.Codebase IO v a -> Stream [SyncV2.EntityChunk] (Vector (Hash32, TempEntity))
unpackChunks :: forall v a.
Codebase IO v a
-> Stream [EntityChunk] (Vector (Hash32, TempEntity))
unpackChunks Codebase IO v a
codebase = ([EntityChunk]
 -> ExceptT SyncErr (ResourceT IO) (Vector (Hash32, TempEntity)))
-> Stream [EntityChunk] (Vector (Hash32, TempEntity))
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> ConduitT a b m ()
C.mapM \[EntityChunk]
xs -> ResourceT IO (Either SyncErr (Vector (Hash32, TempEntity)))
-> ExceptT SyncErr (ResourceT IO) (Vector (Hash32, TempEntity))
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (ResourceT IO (Either SyncErr (Vector (Hash32, TempEntity)))
 -> ExceptT SyncErr (ResourceT IO) (Vector (Hash32, TempEntity)))
-> (ExceptT SyncErr Transaction (Vector (Hash32, TempEntity))
    -> ResourceT IO (Either SyncErr (Vector (Hash32, TempEntity))))
-> ExceptT SyncErr Transaction (Vector (Hash32, TempEntity))
-> ExceptT SyncErr (ResourceT IO) (Vector (Hash32, TempEntity))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Either SyncErr (Vector (Hash32, TempEntity)))
-> ResourceT IO (Either SyncErr (Vector (Hash32, TempEntity)))
forall (m :: * -> *) a. Monad m => m a -> ResourceT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (IO (Either SyncErr (Vector (Hash32, TempEntity)))
 -> ResourceT IO (Either SyncErr (Vector (Hash32, TempEntity))))
-> (ExceptT SyncErr Transaction (Vector (Hash32, TempEntity))
    -> IO (Either SyncErr (Vector (Hash32, TempEntity))))
-> ExceptT SyncErr Transaction (Vector (Hash32, TempEntity))
-> ResourceT IO (Either SyncErr (Vector (Hash32, TempEntity)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Codebase IO v a
-> ExceptT SyncErr Transaction (Vector (Hash32, TempEntity))
-> IO (Either SyncErr (Vector (Hash32, TempEntity)))
forall (m :: * -> *) v a e b.
MonadIO m =>
Codebase m v a -> ExceptT e Transaction b -> m (Either e b)
Codebase.runTransactionExceptT Codebase IO v a
codebase (ExceptT SyncErr Transaction (Vector (Hash32, TempEntity))
 -> ExceptT SyncErr (ResourceT IO) (Vector (Hash32, TempEntity)))
-> ExceptT SyncErr Transaction (Vector (Hash32, TempEntity))
-> ExceptT SyncErr (ResourceT IO) (Vector (Hash32, TempEntity))
forall a b. (a -> b) -> a -> b
$ do
  [EntityChunk]
-> (EntityChunk
    -> ExceptT SyncErr Transaction (Maybe (Hash32, TempEntity)))
-> ExceptT SyncErr Transaction [Maybe (Hash32, TempEntity)]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for [EntityChunk]
xs EntityChunk
-> ExceptT SyncErr Transaction (Maybe (Hash32, TempEntity))
unpackChunk
    ExceptT SyncErr Transaction [Maybe (Hash32, TempEntity)]
-> ([Maybe (Hash32, TempEntity)] -> [(Hash32, TempEntity)])
-> ExceptT SyncErr Transaction [(Hash32, TempEntity)]
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> [Maybe (Hash32, TempEntity)] -> [(Hash32, TempEntity)]
forall a. [Maybe a] -> [a]
catMaybes
    ExceptT SyncErr Transaction [(Hash32, TempEntity)]
-> ([(Hash32, TempEntity)] -> Vector (Hash32, TempEntity))
-> ExceptT SyncErr Transaction (Vector (Hash32, TempEntity))
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> [(Hash32, TempEntity)] -> Vector (Hash32, TempEntity)
forall a. [a] -> Vector a
Vector.fromList

-- | Stream entities from one codebase into another.
streamIntoCodebase ::
  -- | Whether to validate entities as they're imported.
  Bool ->
  Codebase.Codebase IO v a ->
  SyncV2.StreamInitInfo ->
  Stream () SyncV2.EntityChunk ->
  StreamM ()
streamIntoCodebase :: forall v a.
Bool
-> Codebase IO v a
-> StreamInitInfo
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
streamIntoCodebase Bool
shouldValidate Codebase IO v a
codebase SyncV2.StreamInitInfo {Version
version :: Version
$sel:version:StreamInitInfo :: StreamInitInfo -> Version
version, EntitySorting
entitySorting :: EntitySorting
$sel:entitySorting:StreamInitInfo :: StreamInitInfo -> EntitySorting
entitySorting, $sel:numEntities:StreamInitInfo :: StreamInitInfo -> Maybe Word64
numEntities = Maybe Word64
numEntities} Stream () EntityChunk
stream = ResourceT IO (Either SyncErr ())
-> ExceptT SyncErr (ResourceT IO) ()
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT do
  Maybe Int
-> (ConduitT
      EntityChunk EntityChunk (ExceptT SyncErr (ResourceT IO)) ()
    -> ResourceT IO (Either SyncErr ()))
-> ResourceT IO (Either SyncErr ())
forall (m :: * -> *) (n :: * -> *) i a.
(MonadIO m, MonadUnliftIO n) =>
Maybe Int -> (ConduitT i i m () -> n a) -> n a
withStreamProgressCallback (Word64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word64 -> Int) -> Maybe Word64 -> Maybe Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe Word64
numEntities) \ConduitT
  EntityChunk EntityChunk (ExceptT SyncErr (ResourceT IO)) ()
countC -> ExceptT SyncErr (ResourceT IO) ()
-> ResourceT IO (Either SyncErr ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT do
    -- Add a counter to the stream to track how many entities we've processed.
    let stream' :: Stream () EntityChunk
stream' = Stream () EntityChunk
stream Stream () EntityChunk
-> ConduitT
     EntityChunk EntityChunk (ExceptT SyncErr (ResourceT IO)) ()
-> Stream () EntityChunk
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| ConduitT
  EntityChunk EntityChunk (ExceptT SyncErr (ResourceT IO)) ()
countC
    case Version
version of
      (SyncV2.Version Word16
1) -> () -> ExceptT SyncErr (ResourceT IO) ()
forall a. a -> ExceptT SyncErr (ResourceT IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      Version
v -> SyncErr -> ExceptT SyncErr (ResourceT IO) ()
forall a. SyncErr -> ExceptT SyncErr (ResourceT IO) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> ExceptT SyncErr (ResourceT IO) ())
-> (SyncError -> SyncErr)
-> SyncError
-> ExceptT SyncErr (ResourceT IO) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (SyncError -> PullError) -> SyncError -> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError -> PullError
SyncV2.PullError'Sync (SyncError -> ExceptT SyncErr (ResourceT IO) ())
-> SyncError -> ExceptT SyncErr (ResourceT IO) ()
forall a b. (a -> b) -> a -> b
$ Version -> SyncError
SyncV2.SyncErrorUnsupportedVersion Version
v

    case EntitySorting
entitySorting of
      EntitySorting
SyncV2.DependenciesFirst -> Bool
-> Codebase IO v a
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
forall v a.
Bool
-> Codebase IO v a
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
syncSortedStream Bool
shouldValidate Codebase IO v a
codebase Stream () EntityChunk
stream'
      EntitySorting
SyncV2.Unsorted -> Bool
-> Codebase IO v a
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
forall v a.
Bool
-> Codebase IO v a
-> Stream () EntityChunk
-> ExceptT SyncErr (ResourceT IO) ()
syncUnsortedStream Bool
shouldValidate Codebase IO v a
codebase Stream () EntityChunk
stream'

-- | A sanity-check to verify that the hash we expected to import from the stream was successfully loaded into the codebase.
afterSyncChecks :: Codebase.Codebase IO v a -> Hash32 -> ExceptT (SyncError SyncV2.PullError) IO ()
afterSyncChecks :: forall v a. Codebase IO v a -> Hash32 -> ExceptT SyncErr IO ()
afterSyncChecks Codebase IO v a
codebase Hash32
hash = do
  IO Bool -> ExceptT SyncErr IO Bool
forall (m :: * -> *) a. Monad m => m a -> ExceptT SyncErr m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Codebase IO v a -> Hash32 -> IO Bool
forall v a. Codebase IO v a -> Hash32 -> IO Bool
didCausalSuccessfullyImport Codebase IO v a
codebase Hash32
hash) ExceptT SyncErr IO Bool
-> (Bool -> ExceptT SyncErr IO ()) -> ExceptT SyncErr IO ()
forall a b.
ExceptT SyncErr IO a
-> (a -> ExceptT SyncErr IO b) -> ExceptT SyncErr IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Bool
False -> do
      SyncErr -> ExceptT SyncErr IO ()
forall a. SyncErr -> ExceptT SyncErr IO a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (PullError -> SyncErr
forall e. e -> SyncError e
SyncError (SyncError -> PullError
SyncV2.PullError'Sync (SyncError -> PullError)
-> (Hash32 -> SyncError) -> Hash32 -> PullError
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CausalHash -> SyncError
SyncV2.SyncErrorExpectedResultNotInMain (CausalHash -> SyncError)
-> (Hash32 -> CausalHash) -> Hash32 -> SyncError
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Hash32 -> CausalHash
hash32ToCausalHash (Hash32 -> PullError) -> Hash32 -> PullError
forall a b. (a -> b) -> a -> b
$ Hash32
hash))
    Bool
True -> () -> ExceptT SyncErr IO ()
forall a. a -> ExceptT SyncErr IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
  ExceptT SyncErr IO Bool -> ExceptT SyncErr IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ExceptT SyncErr IO Bool -> ExceptT SyncErr IO ())
-> ExceptT SyncErr IO Bool -> ExceptT SyncErr IO ()
forall a b. (a -> b) -> a -> b
$ IO Bool -> ExceptT SyncErr IO Bool
forall a. IO a -> ExceptT SyncErr IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Codebase IO v a -> forall x. (Connection -> IO x) -> IO x
forall (m :: * -> *) v a.
Codebase m v a -> forall x. (Connection -> m x) -> m x
Codebase.withConnection Codebase IO v a
codebase Connection -> IO Bool
Sqlite.vacuum)
  where
    -- Verify that the expected hash made it into main storage.
    didCausalSuccessfullyImport :: Codebase.Codebase IO v a -> Hash32 -> IO Bool
    didCausalSuccessfullyImport :: forall v a. Codebase IO v a -> Hash32 -> IO Bool
didCausalSuccessfullyImport Codebase IO v a
codebase Hash32
hash = do
      let expectedHash :: CausalHash
expectedHash = Hash32 -> CausalHash
hash32ToCausalHash Hash32
hash
      Maybe (CausalHashId, BranchHashId) -> Bool
forall a. Maybe a -> Bool
isJust (Maybe (CausalHashId, BranchHashId) -> Bool)
-> IO (Maybe (CausalHashId, BranchHashId)) -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Codebase IO v a
-> Transaction (Maybe (CausalHashId, BranchHashId))
-> IO (Maybe (CausalHashId, BranchHashId))
forall (m :: * -> *) v a b.
MonadIO m =>
Codebase m v a -> Transaction b -> m b
Codebase.runTransaction Codebase IO v a
codebase (Transaction (Maybe (CausalHashId, BranchHashId))
 -> IO (Maybe (CausalHashId, BranchHashId)))
-> Transaction (Maybe (CausalHashId, BranchHashId))
-> IO (Maybe (CausalHashId, BranchHashId))
forall a b. (a -> b) -> a -> b
$ CausalHash -> Transaction (Maybe (CausalHashId, BranchHashId))
Q.loadCausalByCausalHash CausalHash
expectedHash)

-- | Load and stream entities for a given causal hash from a codebase into a stream.
withCodebaseEntityStream ::
  (MonadIO m) =>
  Sqlite.Connection ->
  CausalHash ->
  Maybe SyncV2.BranchRef ->
  -- | Callback to call with the total count of entities and the stream.
  (Int -> Stream () SyncV2.DownloadEntitiesChunk -> m r) ->
  m r
withCodebaseEntityStream :: forall (m :: * -> *) r.
MonadIO m =>
Connection
-> CausalHash
-> Maybe BranchRef
-> (Int -> Stream () DownloadEntitiesChunk -> m r)
-> m r
withCodebaseEntityStream Connection
conn CausalHash
rootHash Maybe BranchRef
mayBranchRef Int -> Stream () DownloadEntitiesChunk -> m r
callback = do
  Map Hash32 (Entity Text Hash32 Hash32)
entities <- IO (Map Hash32 (Entity Text Hash32 Hash32))
-> m (Map Hash32 (Entity Text Hash32 Hash32))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Map Hash32 (Entity Text Hash32 Hash32))
 -> m (Map Hash32 (Entity Text Hash32 Hash32)))
-> IO (Map Hash32 (Entity Text Hash32 Hash32))
-> m (Map Hash32 (Entity Text Hash32 Hash32))
forall a b. (a -> b) -> a -> b
$ ((Int -> IO ()) -> IO (Map Hash32 (Entity Text Hash32 Hash32)))
-> IO (Map Hash32 (Entity Text Hash32 Hash32))
forall (m :: * -> *) a.
MonadUnliftIO m =>
((Int -> m ()) -> m a) -> m a
withEntityLoadingCallback (((Int -> IO ()) -> IO (Map Hash32 (Entity Text Hash32 Hash32)))
 -> IO (Map Hash32 (Entity Text Hash32 Hash32)))
-> ((Int -> IO ()) -> IO (Map Hash32 (Entity Text Hash32 Hash32)))
-> IO (Map Hash32 (Entity Text Hash32 Hash32))
forall a b. (a -> b) -> a -> b
$ \Int -> IO ()
counter -> do
    Connection
-> Transaction (Map Hash32 (Entity Text Hash32 Hash32))
-> IO (Map Hash32 (Entity Text Hash32 Hash32))
forall (m :: * -> *) a.
(MonadIO m, HasCallStack) =>
Connection -> Transaction a -> m a
Sqlite.runTransaction Connection
conn (CausalHash
-> (Int -> IO ())
-> Transaction (Map Hash32 (Entity Text Hash32 Hash32))
depsForCausal CausalHash
rootHash Int -> IO ()
counter)
  IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Handle -> Text -> IO ()
Text.hPutStrLn Handle
IO.stderr (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"Finished loading entities, writing sync-file."
  let totalEntities :: Int
totalEntities = Int -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$ Map Hash32 (Entity Text Hash32 Hash32) -> Int
forall k a. Map k a -> Int
Map.size Map Hash32 (Entity Text Hash32 Hash32)
entities
  let initialChunk :: DownloadEntitiesChunk
initialChunk =
        StreamInitInfo -> DownloadEntitiesChunk
SyncV2.InitialC
          ( SyncV2.StreamInitInfo
              { $sel:rootCausalHash:StreamInitInfo :: Hash32
rootCausalHash = CausalHash -> Hash32
causalHashToHash32 CausalHash
rootHash,
                $sel:version:StreamInitInfo :: Version
version = Word16 -> Version
SyncV2.Version Word16
1,
                $sel:entitySorting:StreamInitInfo :: EntitySorting
entitySorting = EntitySorting
SyncV2.DependenciesFirst,
                $sel:numEntities:StreamInitInfo :: Maybe Word64
numEntities = Word64 -> Maybe Word64
forall a. a -> Maybe a
Just (Word64 -> Maybe Word64) -> Word64 -> Maybe Word64
forall a b. (a -> b) -> a -> b
$ Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
totalEntities,
                $sel:rootBranchRef:StreamInitInfo :: Maybe BranchRef
rootBranchRef = Maybe BranchRef
mayBranchRef
              }
          )
  let contents :: [DownloadEntitiesChunk]
contents =
        Map Hash32 (Entity Text Hash32 Hash32)
entities
          Map Hash32 (Entity Text Hash32 Hash32)
-> (Map Hash32 (Entity Text Hash32 Hash32)
    -> Map Hash32 TempEntity)
-> Map Hash32 TempEntity
forall a b. a -> (a -> b) -> b
& (Entity Text Hash32 Hash32 -> TempEntity)
-> Map Hash32 (Entity Text Hash32 Hash32) -> Map Hash32 TempEntity
forall a b. (a -> b) -> Map Hash32 a -> Map Hash32 b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((Hash32 -> Hash32) -> Entity Text Hash32 Hash32 -> TempEntity
forall hash.
(hash -> Hash32) -> Entity Text Hash32 hash -> TempEntity
Sync.entityToTempEntity Hash32 -> Hash32
forall a. a -> a
id)
          Map Hash32 TempEntity
-> (Map Hash32 TempEntity -> [(Hash32, TempEntity)])
-> [(Hash32, TempEntity)]
forall a b. a -> (a -> b) -> b
& Map Hash32 TempEntity -> [(Hash32, TempEntity)]
forall k a. Map k a -> [(k, a)]
Map.toList
          [(Hash32, TempEntity)]
-> ([(Hash32, TempEntity)] -> [(Hash32, TempEntity)])
-> [(Hash32, TempEntity)]
forall a b. a -> (a -> b) -> b
& [(Hash32, TempEntity)] -> [(Hash32, TempEntity)]
forall (f :: * -> *).
(Foldable f, Functor f) =>
f (Hash32, TempEntity) -> [(Hash32, TempEntity)]
sortDependencyFirst
          [(Hash32, TempEntity)]
-> ([(Hash32, TempEntity)] -> [DownloadEntitiesChunk])
-> [DownloadEntitiesChunk]
forall a b. a -> (a -> b) -> b
& ( ((Hash32, TempEntity) -> DownloadEntitiesChunk)
-> [(Hash32, TempEntity)] -> [DownloadEntitiesChunk]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap \(Hash32
hash, TempEntity
entity) ->
                let entityCBOR :: CBORBytes TempEntity
entityCBOR = (TempEntity -> CBORBytes TempEntity
forall t. Serialise t => t -> CBORBytes t
CBOR.serialiseCBORBytes TempEntity
entity)
                 in EntityChunk -> DownloadEntitiesChunk
SyncV2.EntityC (SyncV2.EntityChunk {Hash32
$sel:hash:EntityChunk :: Hash32
hash :: Hash32
hash, CBORBytes TempEntity
$sel:entityCBOR:EntityChunk :: CBORBytes TempEntity
entityCBOR :: CBORBytes TempEntity
entityCBOR})
            )
          [DownloadEntitiesChunk]
-> ([DownloadEntitiesChunk] -> [DownloadEntitiesChunk])
-> [DownloadEntitiesChunk]
forall a b. a -> (a -> b) -> b
& (DownloadEntitiesChunk
initialChunk DownloadEntitiesChunk
-> [DownloadEntitiesChunk] -> [DownloadEntitiesChunk]
forall a. a -> [a] -> [a]
:)
  let stream :: ConduitT
  ()
  (Element [DownloadEntitiesChunk])
  (ExceptT SyncErr (ResourceT IO))
  ()
stream = [DownloadEntitiesChunk]
-> ConduitT
     ()
     (Element [DownloadEntitiesChunk])
     (ExceptT SyncErr (ResourceT IO))
     ()
forall (m :: * -> *) mono i.
(Monad m, MonoFoldable mono) =>
mono -> ConduitT i (Element mono) m ()
C.yieldMany [DownloadEntitiesChunk]
contents
  Int -> Stream () DownloadEntitiesChunk -> m r
callback Int
totalEntities Stream () DownloadEntitiesChunk
stream
  where
    -- Collect all dependencies of a given causal hash.
    depsForCausal :: CausalHash -> (Int -> IO ()) -> Sqlite.Transaction (Map Hash32 (Sync.Entity Text Hash32 Hash32))
    depsForCausal :: CausalHash
-> (Int -> IO ())
-> Transaction (Map Hash32 (Entity Text Hash32 Hash32))
depsForCausal CausalHash
causalHash Int -> IO ()
counter = do
      (StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
 -> Map Hash32 (Entity Text Hash32 Hash32)
 -> Transaction (Map Hash32 (Entity Text Hash32 Hash32)))
-> Map Hash32 (Entity Text Hash32 Hash32)
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
-> Transaction (Map Hash32 (Entity Text Hash32 Hash32))
forall a b c. (a -> b -> c) -> b -> a -> c
flip StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
-> Map Hash32 (Entity Text Hash32 Hash32)
-> Transaction (Map Hash32 (Entity Text Hash32 Hash32))
forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m s
execStateT Map Hash32 (Entity Text Hash32 Hash32)
forall a. Monoid a => a
mempty (StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
 -> Transaction (Map Hash32 (Entity Text Hash32 Hash32)))
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
-> Transaction (Map Hash32 (Entity Text Hash32 Hash32))
forall a b. (a -> b) -> a -> b
$ Hash32
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
expandEntities (CausalHash -> Hash32
causalHashToHash32 CausalHash
causalHash)
      where
        expandEntities :: Hash32 -> ((StateT (Map Hash32 (Sync.Entity Text Hash32 Hash32)) Sqlite.Transaction)) ()
        expandEntities :: Hash32
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
expandEntities Hash32
hash32 = do
          (Map Hash32 (Entity Text Hash32 Hash32) -> Bool)
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction Bool
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (Hash32 -> Map Hash32 (Entity Text Hash32 Hash32) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
Map.member Hash32
hash32) StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction Bool
-> (Bool
    -> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ())
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
forall a b.
StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction a
-> (a
    -> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction b)
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
            Bool
True -> ()
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
forall a.
a -> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
            Bool
False -> do
              Entity Text Hash32 Hash32
entity <- Transaction (Entity Text Hash32 Hash32)
-> StateT
     (Map Hash32 (Entity Text Hash32 Hash32))
     Transaction
     (Entity Text Hash32 Hash32)
forall (m :: * -> *) a.
Monad m =>
m a -> StateT (Map Hash32 (Entity Text Hash32 Hash32)) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Transaction (Entity Text Hash32 Hash32)
 -> StateT
      (Map Hash32 (Entity Text Hash32 Hash32))
      Transaction
      (Entity Text Hash32 Hash32))
-> Transaction (Entity Text Hash32 Hash32)
-> StateT
     (Map Hash32 (Entity Text Hash32 Hash32))
     Transaction
     (Entity Text Hash32 Hash32)
forall a b. (a -> b) -> a -> b
$ Hash32 -> Transaction (Entity Text Hash32 Hash32)
Sync.expectEntity Hash32
hash32
              (Map Hash32 (Entity Text Hash32 Hash32)
 -> Map Hash32 (Entity Text Hash32 Hash32))
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify (Hash32
-> Entity Text Hash32 Hash32
-> Map Hash32 (Entity Text Hash32 Hash32)
-> Map Hash32 (Entity Text Hash32 Hash32)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Hash32
hash32 Entity Text Hash32 Hash32
entity)
              Transaction ()
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
forall (m :: * -> *) a.
Monad m =>
m a -> StateT (Map Hash32 (Entity Text Hash32 Hash32)) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Transaction ()
 -> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ())
-> (IO () -> Transaction ())
-> IO ()
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> Transaction ()
forall a. HasCallStack => IO a -> Transaction a
Sqlite.unsafeIO (IO ()
 -> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ())
-> IO ()
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
counter Int
1
              Getting
  (Traversed
     () (StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction))
  (Entity Text Hash32 Hash32)
  Hash32
-> (Hash32
    -> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ())
-> Entity Text Hash32 Hash32
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
forall (f :: * -> *) r s a.
Functor f =>
Getting (Traversed r f) s a -> (a -> f r) -> s -> f ()
traverseOf_ Getting
  (Traversed
     () (StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction))
  (Entity Text Hash32 Hash32)
  Hash32
forall (m :: * -> *) hash' hash text noSyncHash.
(Applicative m, Ord hash') =>
(hash -> m hash')
-> Entity text noSyncHash hash -> m (Entity text noSyncHash hash')
Sync.entityHashes_ Hash32
-> StateT (Map Hash32 (Entity Text Hash32 Hash32)) Transaction ()
expandEntities Entity Text Hash32 Hash32
entity

-- | Gets the framed chunks from a NetString framed stream.
_unNetString :: ConduitT ByteString ByteString StreamM ()
_unNetString :: ConduitT ByteString ByteString (ExceptT SyncErr (ResourceT IO)) ()
_unNetString = do
  ByteString
bs <- Parser ByteString ByteString
-> ConduitT
     ByteString ByteString (ExceptT SyncErr (ResourceT IO)) ByteString
forall a (m :: * -> *) b o.
(AttoparsecInput a, MonadThrow m) =>
Parser a b -> ConduitT a o m b
C.sinkParser (Parser ByteString ByteString
 -> ConduitT
      ByteString ByteString (ExceptT SyncErr (ResourceT IO)) ByteString)
-> Parser ByteString ByteString
-> ConduitT
     ByteString ByteString (ExceptT SyncErr (ResourceT IO)) ByteString
forall a b. (a -> b) -> a -> b
$ do
    Int
len <- Parser Int
forall a. Integral a => Parser a
A8.decimal
    Char
_ <- Char -> Parser Char
A8.char Char
':'
    ByteString
bs <- Int -> Parser ByteString ByteString
A.take Int
len
    Char
_ <- Char -> Parser Char
A8.char Char
','
    pure ByteString
bs
  ByteString
-> ConduitT
     ByteString ByteString (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield ByteString
bs

_decodeFramedEntity :: ByteString -> StreamM SyncV2.DownloadEntitiesChunk
_decodeFramedEntity :: ByteString -> StreamM DownloadEntitiesChunk
_decodeFramedEntity ByteString
bs = do
  case ByteString -> Either DeserialiseFailure DownloadEntitiesChunk
forall a. Serialise a => ByteString -> Either DeserialiseFailure a
CBOR.deserialiseOrFail (ByteString -> ByteString
BL.fromStrict ByteString
bs) of
    Left DeserialiseFailure
err -> SyncErr -> StreamM DownloadEntitiesChunk
forall a. SyncErr -> ExceptT SyncErr (ResourceT IO) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> StreamM DownloadEntitiesChunk)
-> (SyncError -> SyncErr)
-> SyncError
-> StreamM DownloadEntitiesChunk
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (SyncError -> PullError) -> SyncError -> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError -> PullError
SyncV2.PullError'Sync (SyncError -> StreamM DownloadEntitiesChunk)
-> SyncError -> StreamM DownloadEntitiesChunk
forall a b. (a -> b) -> a -> b
$ DeserialiseFailure -> SyncError
SyncV2.SyncErrorDeserializationFailure DeserialiseFailure
err
    Right DownloadEntitiesChunk
chunk -> DownloadEntitiesChunk -> StreamM DownloadEntitiesChunk
forall a. a -> ExceptT SyncErr (ResourceT IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure DownloadEntitiesChunk
chunk

-- | Unpacks a stream of tightly-packed CBOR entities without any framing/separators.
decodeUnframedEntities :: forall a. (CBOR.Serialise a) => Stream ByteString a
decodeUnframedEntities :: forall a. Serialise a => Stream ByteString a
decodeUnframedEntities = (forall a.
 ExceptT SyncErr (ST RealWorld) a
 -> ExceptT SyncErr (ResourceT IO) a)
-> ConduitT ByteString a (ExceptT SyncErr (ST RealWorld)) ()
-> ConduitT ByteString a (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) (n :: * -> *) i o r.
Monad m =>
(forall a. m a -> n a) -> ConduitT i o m r -> ConduitT i o n r
C.transPipe ((ST RealWorld (Either SyncErr a)
 -> ResourceT IO (Either SyncErr a))
-> ExceptT SyncErr (ST RealWorld) a
-> ExceptT SyncErr (ResourceT IO) a
forall (m :: * -> *) e a (n :: * -> *) e' b.
(m (Either e a) -> n (Either e' b))
-> ExceptT e m a -> ExceptT e' n b
mapExceptT (IO (Either SyncErr a) -> ResourceT IO (Either SyncErr a)
forall (m :: * -> *) a. Monad m => m a -> ResourceT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (IO (Either SyncErr a) -> ResourceT IO (Either SyncErr a))
-> (ST RealWorld (Either SyncErr a) -> IO (Either SyncErr a))
-> ST RealWorld (Either SyncErr a)
-> ResourceT IO (Either SyncErr a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ST RealWorld (Either SyncErr a) -> IO (Either SyncErr a)
forall a. ST RealWorld a -> IO a
stToIO)) (ConduitT ByteString a (ExceptT SyncErr (ST RealWorld)) ()
 -> ConduitT ByteString a (ExceptT SyncErr (ResourceT IO)) ())
-> ConduitT ByteString a (ExceptT SyncErr (ST RealWorld)) ()
-> ConduitT ByteString a (ExceptT SyncErr (ResourceT IO)) ()
forall a b. (a -> b) -> a -> b
$ do
  ConduitT
  ByteString a (ExceptT SyncErr (ST RealWorld)) (Maybe ByteString)
forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
C.await ConduitT
  ByteString a (ExceptT SyncErr (ST RealWorld)) (Maybe ByteString)
-> (Maybe ByteString
    -> ConduitT ByteString a (ExceptT SyncErr (ST RealWorld)) ())
-> ConduitT ByteString a (ExceptT SyncErr (ST RealWorld)) ()
forall a b.
ConduitT ByteString a (ExceptT SyncErr (ST RealWorld)) a
-> (a -> ConduitT ByteString a (ExceptT SyncErr (ST RealWorld)) b)
-> ConduitT ByteString a (ExceptT SyncErr (ST RealWorld)) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Maybe ByteString
Nothing -> () -> ConduitT ByteString a (ExceptT SyncErr (ST RealWorld)) ()
forall a.
a -> ConduitT ByteString a (ExceptT SyncErr (ST RealWorld)) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    Just ByteString
bs -> do
      Maybe ByteString -> ST RealWorld (IDecode RealWorld a)
d <- ConduitT
  ByteString
  a
  (ExceptT SyncErr (ST RealWorld))
  (Maybe ByteString -> ST RealWorld (IDecode RealWorld a))
forall s.
ConduitT
  ByteString
  a
  (ExceptT SyncErr (ST s))
  (Maybe ByteString -> ST s (IDecode s a))
newDecoder
      ByteString
-> (Maybe ByteString -> ST RealWorld (IDecode RealWorld a))
-> ConduitT ByteString a (ExceptT SyncErr (ST RealWorld)) ()
forall s.
ByteString
-> (Maybe ByteString -> ST s (IDecode s a))
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
loop ByteString
bs Maybe ByteString -> ST RealWorld (IDecode RealWorld a)
d
  where
    newDecoder :: ConduitT ByteString a (ExceptT SyncErr (ST s)) (Maybe ByteString -> ST s (CBOR.IDecode s a))
    newDecoder :: forall s.
ConduitT
  ByteString
  a
  (ExceptT SyncErr (ST s))
  (Maybe ByteString -> ST s (IDecode s a))
newDecoder = do
      (ExceptT SyncErr (ST s) (IDecode s a)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a)
forall (m :: * -> *) a. Monad m => m a -> ConduitT ByteString a m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ExceptT SyncErr (ST s) (IDecode s a)
 -> ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a))
-> (ST s (IDecode s a) -> ExceptT SyncErr (ST s) (IDecode s a))
-> ST s (IDecode s a)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ST s (IDecode s a) -> ExceptT SyncErr (ST s) (IDecode s a)
forall (m :: * -> *) a. Monad m => m a -> ExceptT SyncErr m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift) ST s (IDecode s a)
forall a s. Serialise a => ST s (IDecode s a)
CBOR.deserialiseIncremental ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a)
-> (IDecode s a
    -> ConduitT
         ByteString
         a
         (ExceptT SyncErr (ST s))
         (Maybe ByteString -> ST s (IDecode s a)))
-> ConduitT
     ByteString
     a
     (ExceptT SyncErr (ST s))
     (Maybe ByteString -> ST s (IDecode s a))
forall a b.
ConduitT ByteString a (ExceptT SyncErr (ST s)) a
-> (a -> ConduitT ByteString a (ExceptT SyncErr (ST s)) b)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        CBOR.Done ByteString
_ ByteOffset
_ a
_ -> SyncErr
-> ConduitT
     ByteString
     a
     (ExceptT SyncErr (ST s))
     (Maybe ByteString -> ST s (IDecode s a))
forall a.
SyncErr -> ConduitT ByteString a (ExceptT SyncErr (ST s)) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr
 -> ConduitT
      ByteString
      a
      (ExceptT SyncErr (ST s))
      (Maybe ByteString -> ST s (IDecode s a)))
-> (SyncError -> SyncErr)
-> SyncError
-> ConduitT
     ByteString
     a
     (ExceptT SyncErr (ST s))
     (Maybe ByteString -> ST s (IDecode s a))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (SyncError -> PullError) -> SyncError -> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError -> PullError
SyncV2.PullError'Sync (SyncError
 -> ConduitT
      ByteString
      a
      (ExceptT SyncErr (ST s))
      (Maybe ByteString -> ST s (IDecode s a)))
-> SyncError
-> ConduitT
     ByteString
     a
     (ExceptT SyncErr (ST s))
     (Maybe ByteString -> ST s (IDecode s a))
forall a b. (a -> b) -> a -> b
$ Text -> SyncError
SyncV2.SyncErrorStreamFailure Text
"Invalid initial decoder"
        CBOR.Fail ByteString
_ ByteOffset
_ DeserialiseFailure
err -> SyncErr
-> ConduitT
     ByteString
     a
     (ExceptT SyncErr (ST s))
     (Maybe ByteString -> ST s (IDecode s a))
forall a.
SyncErr -> ConduitT ByteString a (ExceptT SyncErr (ST s)) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr
 -> ConduitT
      ByteString
      a
      (ExceptT SyncErr (ST s))
      (Maybe ByteString -> ST s (IDecode s a)))
-> (SyncError -> SyncErr)
-> SyncError
-> ConduitT
     ByteString
     a
     (ExceptT SyncErr (ST s))
     (Maybe ByteString -> ST s (IDecode s a))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (SyncError -> PullError) -> SyncError -> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError -> PullError
SyncV2.PullError'Sync (SyncError
 -> ConduitT
      ByteString
      a
      (ExceptT SyncErr (ST s))
      (Maybe ByteString -> ST s (IDecode s a)))
-> SyncError
-> ConduitT
     ByteString
     a
     (ExceptT SyncErr (ST s))
     (Maybe ByteString -> ST s (IDecode s a))
forall a b. (a -> b) -> a -> b
$ DeserialiseFailure -> SyncError
SyncV2.SyncErrorDeserializationFailure DeserialiseFailure
err
        CBOR.Partial Maybe ByteString -> ST s (IDecode s a)
k -> (Maybe ByteString -> ST s (IDecode s a))
-> ConduitT
     ByteString
     a
     (ExceptT SyncErr (ST s))
     (Maybe ByteString -> ST s (IDecode s a))
forall a. a -> ConduitT ByteString a (ExceptT SyncErr (ST s)) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ByteString -> ST s (IDecode s a)
k
    loop :: ByteString -> (Maybe ByteString -> ST s (CBOR.IDecode s a)) -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
    loop :: forall s.
ByteString
-> (Maybe ByteString -> ST s (IDecode s a))
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
loop ByteString
bs Maybe ByteString -> ST s (IDecode s a)
k = do
      (ExceptT SyncErr (ST s) (IDecode s a)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a)
forall (m :: * -> *) a. Monad m => m a -> ConduitT ByteString a m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ExceptT SyncErr (ST s) (IDecode s a)
 -> ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a))
-> (ST s (IDecode s a) -> ExceptT SyncErr (ST s) (IDecode s a))
-> ST s (IDecode s a)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ST s (IDecode s a) -> ExceptT SyncErr (ST s) (IDecode s a)
forall (m :: * -> *) a. Monad m => m a -> ExceptT SyncErr m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift) (Maybe ByteString -> ST s (IDecode s a)
k (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
bs)) ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a)
-> (IDecode s a
    -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ())
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall a b.
ConduitT ByteString a (ExceptT SyncErr (ST s)) a
-> (a -> ConduitT ByteString a (ExceptT SyncErr (ST s)) b)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        CBOR.Fail ByteString
_ ByteOffset
_ DeserialiseFailure
err -> SyncErr -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall a.
SyncErr -> ConduitT ByteString a (ExceptT SyncErr (ST s)) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ())
-> (SyncError -> SyncErr)
-> SyncError
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (SyncError -> PullError) -> SyncError -> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError -> PullError
SyncV2.PullError'Sync (SyncError -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ())
-> SyncError -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall a b. (a -> b) -> a -> b
$ DeserialiseFailure -> SyncError
SyncV2.SyncErrorDeserializationFailure DeserialiseFailure
err
        CBOR.Partial Maybe ByteString -> ST s (IDecode s a)
k' -> do
          -- We need more input, try to get some
          Maybe ByteString
nextBS <- ConduitT ByteString a (ExceptT SyncErr (ST s)) (Maybe ByteString)
forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
C.await
          case Maybe ByteString
nextBS of
            Maybe ByteString
Nothing -> do
              -- No more input, try to finish up the decoder.
              (ExceptT SyncErr (ST s) (IDecode s a)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a)
forall (m :: * -> *) a. Monad m => m a -> ConduitT ByteString a m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ExceptT SyncErr (ST s) (IDecode s a)
 -> ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a))
-> (ST s (IDecode s a) -> ExceptT SyncErr (ST s) (IDecode s a))
-> ST s (IDecode s a)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ST s (IDecode s a) -> ExceptT SyncErr (ST s) (IDecode s a)
forall (m :: * -> *) a. Monad m => m a -> ExceptT SyncErr m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift) (Maybe ByteString -> ST s (IDecode s a)
k' Maybe ByteString
forall a. Maybe a
Nothing) ConduitT ByteString a (ExceptT SyncErr (ST s)) (IDecode s a)
-> (IDecode s a
    -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ())
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall a b.
ConduitT ByteString a (ExceptT SyncErr (ST s)) a
-> (a -> ConduitT ByteString a (ExceptT SyncErr (ST s)) b)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                CBOR.Done ByteString
_ ByteOffset
_ a
a -> a -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield a
a
                CBOR.Fail ByteString
_ ByteOffset
_ DeserialiseFailure
err -> SyncErr -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall a.
SyncErr -> ConduitT ByteString a (ExceptT SyncErr (ST s)) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ())
-> (SyncError -> SyncErr)
-> SyncError
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (SyncError -> PullError) -> SyncError -> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError -> PullError
SyncV2.PullError'Sync (SyncError -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ())
-> SyncError -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall a b. (a -> b) -> a -> b
$ DeserialiseFailure -> SyncError
SyncV2.SyncErrorDeserializationFailure DeserialiseFailure
err
                CBOR.Partial Maybe ByteString -> ST s (IDecode s a)
_ -> SyncErr -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall a.
SyncErr -> ConduitT ByteString a (ExceptT SyncErr (ST s)) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ())
-> (SyncError -> SyncErr)
-> SyncError
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (SyncError -> PullError) -> SyncError -> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError -> PullError
SyncV2.PullError'Sync (SyncError -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ())
-> SyncError -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall a b. (a -> b) -> a -> b
$ Text -> SyncError
SyncV2.SyncErrorStreamFailure Text
"Unexpected end of input"
            Just ByteString
bs' ->
              -- Have some input, keep going.
              ByteString
-> (Maybe ByteString -> ST s (IDecode s a))
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall s.
ByteString
-> (Maybe ByteString -> ST s (IDecode s a))
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
loop ByteString
bs' Maybe ByteString -> ST s (IDecode s a)
k'
        CBOR.Done ByteString
rem ByteOffset
_ a
a -> do
          a -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield a
a
          if ByteString -> Bool
BS.null ByteString
rem
            then do
              -- If we had no leftovers, we can check if there's any input left.
              ConduitT ByteString a (ExceptT SyncErr (ST s)) (Maybe ByteString)
forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
C.await ConduitT ByteString a (ExceptT SyncErr (ST s)) (Maybe ByteString)
-> (Maybe ByteString
    -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ())
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall a b.
ConduitT ByteString a (ExceptT SyncErr (ST s)) a
-> (a -> ConduitT ByteString a (ExceptT SyncErr (ST s)) b)
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                Maybe ByteString
Nothing -> () -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall a. a -> ConduitT ByteString a (ExceptT SyncErr (ST s)) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                Just ByteString
bs'' -> do
                  -- If we have input left, start up a new decoder.
                  Maybe ByteString -> ST s (IDecode s a)
k <- ConduitT
  ByteString
  a
  (ExceptT SyncErr (ST s))
  (Maybe ByteString -> ST s (IDecode s a))
forall s.
ConduitT
  ByteString
  a
  (ExceptT SyncErr (ST s))
  (Maybe ByteString -> ST s (IDecode s a))
newDecoder
                  ByteString
-> (Maybe ByteString -> ST s (IDecode s a))
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall s.
ByteString
-> (Maybe ByteString -> ST s (IDecode s a))
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
loop ByteString
bs'' Maybe ByteString -> ST s (IDecode s a)
k
            else do
              -- We have leftovers, start a new decoder and use those.
              Maybe ByteString -> ST s (IDecode s a)
k <- ConduitT
  ByteString
  a
  (ExceptT SyncErr (ST s))
  (Maybe ByteString -> ST s (IDecode s a))
forall s.
ConduitT
  ByteString
  a
  (ExceptT SyncErr (ST s))
  (Maybe ByteString -> ST s (IDecode s a))
newDecoder
              ByteString
-> (Maybe ByteString -> ST s (IDecode s a))
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
forall s.
ByteString
-> (Maybe ByteString -> ST s (IDecode s a))
-> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
loop ByteString
rem Maybe ByteString -> ST s (IDecode s a)
k

------------------------------------------------------------------------------------------------------------------------
-- Servant stuff

type SyncAPI = ("ucm" Servant.:> "v2" Servant.:> "sync" Servant.:> SyncV2.API)

syncAPI :: Proxy SyncAPI
syncAPI :: Proxy SyncAPI
syncAPI = forall t. Proxy t
forall {k} (t :: k). Proxy t
Proxy @SyncAPI

downloadEntitiesStreamClientM :: SyncV2.DownloadEntitiesRequest -> Servant.ClientM (Servant.SourceT IO (CBORStream SyncV2.DownloadEntitiesChunk))
causalDependenciesStreamClientM :: SyncV2.CausalDependenciesRequest -> Servant.ClientM (Servant.SourceT IO (CBORStream SyncV2.CausalDependenciesChunk))
SyncV2.Routes
  { $sel:downloadEntitiesStream:Routes :: forall mode.
Routes mode
-> mode :- ("entities" :> ("download" :> DownloadEntitiesStream))
downloadEntitiesStream = AsClientT ClientM
:- ("entities" :> ("download" :> DownloadEntitiesStream))
DownloadEntitiesRequest
-> ClientM (SourceIO (CBORStream DownloadEntitiesChunk))
downloadEntitiesStreamClientM,
    $sel:causalDependenciesStream:Routes :: forall mode.
Routes mode
-> mode
   :- ("entities" :> ("dependencies" :> CausalDependenciesStream))
causalDependenciesStream = AsClientT ClientM
:- ("entities" :> ("dependencies" :> CausalDependenciesStream))
CausalDependenciesRequest
-> ClientM (SourceIO (CBORStream CausalDependenciesChunk))
causalDependenciesStreamClientM
  } = Proxy SyncAPI -> Client ClientM SyncAPI
forall api.
HasClient ClientM api =>
Proxy api -> Client ClientM api
Servant.client Proxy SyncAPI
syncAPI

-- | Helper for running clientM that returns a stream of entities.
-- You MUST consume the stream within the callback, it will be closed when the callback returns.
withConduit :: forall r chunk. (CBOR.Serialise chunk) => Servant.ClientEnv -> (Stream () chunk -> StreamM r) -> Servant.ClientM (Servant.SourceIO (CBORStream chunk)) -> StreamM r
withConduit :: forall r chunk.
Serialise chunk =>
ClientEnv
-> (Stream () chunk -> StreamM r)
-> ClientM (SourceIO (CBORStream chunk))
-> StreamM r
withConduit ClientEnv
clientEnv Stream () chunk -> StreamM r
callback ClientM (SourceIO (CBORStream chunk))
clientM = do
  ResourceT IO (Either SyncErr r) -> StreamM r
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (ResourceT IO (Either SyncErr r) -> StreamM r)
-> ResourceT IO (Either SyncErr r) -> StreamM r
forall a b. (a -> b) -> a -> b
$ ((forall a. ResourceT IO a -> IO a) -> IO (Either SyncErr r))
-> ResourceT IO (Either SyncErr r)
forall b.
((forall a. ResourceT IO a -> IO a) -> IO b) -> ResourceT IO b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO \forall a. ResourceT IO a -> IO a
runInIO -> do
    ClientM (SourceIO (CBORStream chunk))
-> ClientEnv
-> (Either ClientError (SourceIO (CBORStream chunk))
    -> IO (Either SyncErr r))
-> IO (Either SyncErr r)
forall a b.
ClientM a -> ClientEnv -> (Either ClientError a -> IO b) -> IO b
Servant.withClientM ClientM (SourceIO (CBORStream chunk))
clientM ClientEnv
clientEnv ((Either ClientError (SourceIO (CBORStream chunk))
  -> IO (Either SyncErr r))
 -> IO (Either SyncErr r))
-> (Either ClientError (SourceIO (CBORStream chunk))
    -> IO (Either SyncErr r))
-> IO (Either SyncErr r)
forall a b. (a -> b) -> a -> b
$ \case
      Left ClientError
err -> Either SyncErr r -> IO (Either SyncErr r)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either SyncErr r -> IO (Either SyncErr r))
-> (CodeserverTransportError -> Either SyncErr r)
-> CodeserverTransportError
-> IO (Either SyncErr r)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncErr -> Either SyncErr r
forall a b. a -> Either a b
Left (SyncErr -> Either SyncErr r)
-> (CodeserverTransportError -> SyncErr)
-> CodeserverTransportError
-> Either SyncErr r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CodeserverTransportError -> SyncErr
forall e. CodeserverTransportError -> SyncError e
TransportError (CodeserverTransportError -> IO (Either SyncErr r))
-> CodeserverTransportError -> IO (Either SyncErr r)
forall a b. (a -> b) -> a -> b
$ (ClientEnv -> ClientError -> CodeserverTransportError
handleClientError ClientEnv
clientEnv ClientError
err)
      Right SourceIO (CBORStream chunk)
sourceT -> do
        ConduitT () (CBORStream chunk) (ExceptT SyncErr (ResourceT IO)) ()
conduit <- IO
  (ConduitT
     () (CBORStream chunk) (ExceptT SyncErr (ResourceT IO)) ())
-> IO
     (ConduitT
        () (CBORStream chunk) (ExceptT SyncErr (ResourceT IO)) ())
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO
   (ConduitT
      () (CBORStream chunk) (ExceptT SyncErr (ResourceT IO)) ())
 -> IO
      (ConduitT
         () (CBORStream chunk) (ExceptT SyncErr (ResourceT IO)) ()))
-> IO
     (ConduitT
        () (CBORStream chunk) (ExceptT SyncErr (ResourceT IO)) ())
-> IO
     (ConduitT
        () (CBORStream chunk) (ExceptT SyncErr (ResourceT IO)) ())
forall a b. (a -> b) -> a -> b
$ SourceIO (CBORStream chunk)
-> IO
     (ConduitT
        () (CBORStream chunk) (ExceptT SyncErr (ResourceT IO)) ())
forall chunk a. FromSourceIO chunk a => SourceIO chunk -> IO a
Servant.fromSourceIO SourceIO (CBORStream chunk)
sourceT
        (ResourceT IO (Either SyncErr r) -> IO (Either SyncErr r)
forall a. ResourceT IO a -> IO a
runInIO (ResourceT IO (Either SyncErr r) -> IO (Either SyncErr r))
-> (StreamM r -> ResourceT IO (Either SyncErr r))
-> StreamM r
-> IO (Either SyncErr r)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamM r -> ResourceT IO (Either SyncErr r)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (StreamM r -> IO (Either SyncErr r))
-> StreamM r -> IO (Either SyncErr r)
forall a b. (a -> b) -> a -> b
$ Stream () chunk -> StreamM r
callback (ConduitT () (CBORStream chunk) (ExceptT SyncErr (ResourceT IO)) ()
conduit ConduitT () (CBORStream chunk) (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT
     (CBORStream chunk) chunk (ExceptT SyncErr (ResourceT IO)) ()
-> Stream () chunk
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| ConduitT
  (CBORStream chunk) chunk (ExceptT SyncErr (ResourceT IO)) ()
forall a. Serialise a => Stream (CBORStream a) a
unpackCBORBytesStream))

unpackCBORBytesStream :: (CBOR.Serialise a) => Stream (CBORStream a) a
unpackCBORBytesStream :: forall a. Serialise a => Stream (CBORStream a) a
unpackCBORBytesStream =
  (CBORStream a -> ByteString)
-> ConduitT
     (CBORStream a) ByteString (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
C.map (ByteString -> ByteString
BL.toStrict (ByteString -> ByteString)
-> (CBORStream a -> ByteString) -> CBORStream a -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. Coercible a b => a -> b
forall a b. Coercible a b => a -> b
coerce @_ @BL.ByteString) ConduitT
  (CBORStream a) ByteString (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT ByteString a (ExceptT SyncErr (ResourceT IO)) ()
-> ConduitT (CBORStream a) a (ExceptT SyncErr (ResourceT IO)) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| ConduitT ByteString a (ExceptT SyncErr (ResourceT IO)) ()
forall a. Serialise a => Stream ByteString a
decodeUnframedEntities

handleClientError :: Servant.ClientEnv -> Servant.ClientError -> CodeserverTransportError
handleClientError :: ClientEnv -> ClientError -> CodeserverTransportError
handleClientError ClientEnv
clientEnv ClientError
err =
  case ClientError
err of
    Servant.FailureResponse RequestF () (BaseUrl, ByteString)
_req Response
resp ->
      case Status -> Int
HTTP.statusCode (Status -> Int) -> Status -> Int
forall a b. (a -> b) -> a -> b
$ Response -> Status
forall a. ResponseF a -> Status
Servant.responseStatusCode Response
resp of
        Int
401 -> BaseUrl -> CodeserverTransportError
Unauthenticated (ClientEnv -> BaseUrl
Servant.baseUrl ClientEnv
clientEnv)
        -- The server should provide semantically relevant permission-denied messages
        -- when possible, but this should catch any we miss.
        Int
403 -> Text -> CodeserverTransportError
PermissionDenied (Text -> Text
Text.Lazy.toStrict (Text -> Text) -> (ByteString -> Text) -> ByteString -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text
Text.Lazy.decodeUtf8 (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ Response -> ByteString
forall a. ResponseF a -> a
Servant.responseBody Response
resp)
        Int
408 -> CodeserverTransportError
Timeout
        Int
429 -> CodeserverTransportError
RateLimitExceeded
        Int
504 -> CodeserverTransportError
Timeout
        Int
_ -> Response -> CodeserverTransportError
UnexpectedResponse Response
resp
    Servant.DecodeFailure Text
msg Response
resp -> Text -> Response -> CodeserverTransportError
DecodeFailure Text
msg Response
resp
    Servant.UnsupportedContentType MediaType
_ct Response
resp -> Response -> CodeserverTransportError
UnexpectedResponse Response
resp
    Servant.InvalidContentTypeHeader Response
resp -> Response -> CodeserverTransportError
UnexpectedResponse Response
resp
    Servant.ConnectionError SomeException
_ -> BaseUrl -> CodeserverTransportError
UnreachableCodeserver (ClientEnv -> BaseUrl
Servant.baseUrl ClientEnv
clientEnv)

-- | Stream entities from the codeserver.
httpStreamEntities ::
  Auth.AuthenticatedHttpClient ->
  Servant.BaseUrl ->
  SyncV2.DownloadEntitiesRequest ->
  (SyncV2.StreamInitInfo -> Stream () SyncV2.EntityChunk -> StreamM ()) ->
  StreamM ()
httpStreamEntities :: AuthenticatedHttpClient
-> BaseUrl
-> DownloadEntitiesRequest
-> (StreamInitInfo
    -> Stream () EntityChunk -> ExceptT SyncErr (ResourceT IO) ())
-> ExceptT SyncErr (ResourceT IO) ()
httpStreamEntities (Auth.AuthenticatedHttpClient Manager
httpClient) BaseUrl
unisonShareUrl DownloadEntitiesRequest
req StreamInitInfo
-> Stream () EntityChunk -> ExceptT SyncErr (ResourceT IO) ()
callback = do
  let clientEnv :: ClientEnv
clientEnv =
        (Manager -> BaseUrl -> ClientEnv
Servant.mkClientEnv Manager
httpClient BaseUrl
unisonShareUrl)
          { Servant.makeClientRequest = \BaseUrl
url Request
request ->
              -- Disable client-side timeouts
              (BaseUrl -> Request -> IO Request
Servant.defaultMakeClientRequest BaseUrl
url Request
request)
                IO Request -> (Request -> Request) -> IO Request
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \Request
r ->
                  Request
r
                    { Http.Client.responseTimeout = Http.Client.responseTimeoutNone
                    }
          }
  (DownloadEntitiesRequest
-> ClientM (SourceIO (CBORStream DownloadEntitiesChunk))
downloadEntitiesStreamClientM DownloadEntitiesRequest
req) ClientM (SourceIO (CBORStream DownloadEntitiesChunk))
-> (ClientM (SourceIO (CBORStream DownloadEntitiesChunk))
    -> ExceptT SyncErr (ResourceT IO) ())
-> ExceptT SyncErr (ResourceT IO) ()
forall a b. a -> (a -> b) -> b
& ClientEnv
-> (Stream () DownloadEntitiesChunk
    -> ExceptT SyncErr (ResourceT IO) ())
-> ClientM (SourceIO (CBORStream DownloadEntitiesChunk))
-> ExceptT SyncErr (ResourceT IO) ()
forall r chunk.
Serialise chunk =>
ClientEnv
-> (Stream () chunk -> StreamM r)
-> ClientM (SourceIO (CBORStream chunk))
-> StreamM r
withConduit ClientEnv
clientEnv \Stream () DownloadEntitiesChunk
stream -> do
    (StreamInitInfo
init, Stream () EntityChunk
entityStream) <- Stream () DownloadEntitiesChunk
-> StreamM (StreamInitInfo, Stream () EntityChunk)
initializeStream Stream () DownloadEntitiesChunk
stream
    StreamInitInfo
-> Stream () EntityChunk -> ExceptT SyncErr (ResourceT IO) ()
callback StreamInitInfo
init Stream () EntityChunk
entityStream

-- | Peel the header off the stream and parse the remaining entity chunks into EntityChunks
initializeStream :: Stream () SyncV2.DownloadEntitiesChunk -> StreamM (SyncV2.StreamInitInfo, Stream () SyncV2.EntityChunk)
initializeStream :: Stream () DownloadEntitiesChunk
-> StreamM (StreamInitInfo, Stream () EntityChunk)
initializeStream Stream () DownloadEntitiesChunk
stream = do
  (SealedConduitT
  () DownloadEntitiesChunk (ExceptT SyncErr (ResourceT IO)) ()
streamRemainder, Maybe DownloadEntitiesChunk
init) <- Stream () DownloadEntitiesChunk
stream Stream () DownloadEntitiesChunk
-> ConduitT
     DownloadEntitiesChunk
     Void
     (ExceptT SyncErr (ResourceT IO))
     (Maybe DownloadEntitiesChunk)
-> ExceptT
     SyncErr
     (ResourceT IO)
     (SealedConduitT
        () DownloadEntitiesChunk (ExceptT SyncErr (ResourceT IO)) (),
      Maybe DownloadEntitiesChunk)
forall (m :: * -> *) a b.
Monad m =>
ConduitT () a m ()
-> ConduitT a Void m b -> m (SealedConduitT () a m (), b)
C.$$+ ConduitT
  DownloadEntitiesChunk
  Void
  (ExceptT SyncErr (ResourceT IO))
  (Maybe DownloadEntitiesChunk)
forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
C.headC
  case Maybe DownloadEntitiesChunk
init of
    Maybe DownloadEntitiesChunk
Nothing -> SyncErr -> StreamM (StreamInitInfo, Stream () EntityChunk)
forall a. SyncErr -> ExceptT SyncErr (ResourceT IO) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> StreamM (StreamInitInfo, Stream () EntityChunk))
-> (SyncError -> SyncErr)
-> SyncError
-> StreamM (StreamInitInfo, Stream () EntityChunk)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (SyncError -> PullError) -> SyncError -> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError -> PullError
SyncV2.PullError'Sync (SyncError -> StreamM (StreamInitInfo, Stream () EntityChunk))
-> SyncError -> StreamM (StreamInitInfo, Stream () EntityChunk)
forall a b. (a -> b) -> a -> b
$ SyncError
SyncV2.SyncErrorMissingInitialChunk
    Just DownloadEntitiesChunk
chunk -> do
      case DownloadEntitiesChunk
chunk of
        SyncV2.InitialC StreamInitInfo
info -> do
          let entityStream :: Stream () EntityChunk
entityStream = SealedConduitT
  () DownloadEntitiesChunk (ExceptT SyncErr (ResourceT IO)) ()
-> Stream () DownloadEntitiesChunk
forall (m :: * -> *) i o r.
Monad m =>
SealedConduitT i o m r -> ConduitT i o m r
C.unsealConduitT SealedConduitT
  () DownloadEntitiesChunk (ExceptT SyncErr (ResourceT IO)) ()
streamRemainder Stream () DownloadEntitiesChunk
-> ConduitT
     DownloadEntitiesChunk
     EntityChunk
     (ExceptT SyncErr (ResourceT IO))
     ()
-> Stream () EntityChunk
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| (DownloadEntitiesChunk
 -> ExceptT SyncErr (ResourceT IO) EntityChunk)
-> ConduitT
     DownloadEntitiesChunk
     EntityChunk
     (ExceptT SyncErr (ResourceT IO))
     ()
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> ConduitT a b m ()
C.mapM DownloadEntitiesChunk -> ExceptT SyncErr (ResourceT IO) EntityChunk
parseEntity
          (StreamInitInfo, Stream () EntityChunk)
-> StreamM (StreamInitInfo, Stream () EntityChunk)
forall a. a -> ExceptT SyncErr (ResourceT IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((StreamInitInfo, Stream () EntityChunk)
 -> StreamM (StreamInitInfo, Stream () EntityChunk))
-> (StreamInitInfo, Stream () EntityChunk)
-> StreamM (StreamInitInfo, Stream () EntityChunk)
forall a b. (a -> b) -> a -> b
$ (StreamInitInfo
info, Stream () EntityChunk
entityStream)
        SyncV2.EntityC EntityChunk
_ -> do
          SyncErr -> StreamM (StreamInitInfo, Stream () EntityChunk)
forall a. SyncErr -> ExceptT SyncErr (ResourceT IO) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> StreamM (StreamInitInfo, Stream () EntityChunk))
-> (SyncError -> SyncErr)
-> SyncError
-> StreamM (StreamInitInfo, Stream () EntityChunk)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (SyncError -> PullError) -> SyncError -> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SyncError -> PullError
SyncV2.PullError'Sync (SyncError -> StreamM (StreamInitInfo, Stream () EntityChunk))
-> SyncError -> StreamM (StreamInitInfo, Stream () EntityChunk)
forall a b. (a -> b) -> a -> b
$ SyncError
SyncV2.SyncErrorMissingInitialChunk
        SyncV2.ErrorC (SyncV2.ErrorChunk DownloadEntitiesError
err) -> SyncErr -> StreamM (StreamInitInfo, Stream () EntityChunk)
forall a. SyncErr -> ExceptT SyncErr (ResourceT IO) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> StreamM (StreamInitInfo, Stream () EntityChunk))
-> (DownloadEntitiesError -> SyncErr)
-> DownloadEntitiesError
-> StreamM (StreamInitInfo, Stream () EntityChunk)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> SyncErr)
-> (DownloadEntitiesError -> PullError)
-> DownloadEntitiesError
-> SyncErr
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DownloadEntitiesError -> PullError
SyncV2.PullError'DownloadEntities (DownloadEntitiesError
 -> StreamM (StreamInitInfo, Stream () EntityChunk))
-> DownloadEntitiesError
-> StreamM (StreamInitInfo, Stream () EntityChunk)
forall a b. (a -> b) -> a -> b
$ DownloadEntitiesError
err
  where
    parseEntity :: SyncV2.DownloadEntitiesChunk -> StreamM SyncV2.EntityChunk
    parseEntity :: DownloadEntitiesChunk -> ExceptT SyncErr (ResourceT IO) EntityChunk
parseEntity = \case
      SyncV2.EntityC EntityChunk
chunk -> EntityChunk -> ExceptT SyncErr (ResourceT IO) EntityChunk
forall a. a -> ExceptT SyncErr (ResourceT IO) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure EntityChunk
chunk
      SyncV2.ErrorC (SyncV2.ErrorChunk DownloadEntitiesError
err) -> SyncErr -> ExceptT SyncErr (ResourceT IO) EntityChunk
forall a. SyncErr -> ExceptT SyncErr (ResourceT IO) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> ExceptT SyncErr (ResourceT IO) EntityChunk)
-> (PullError -> SyncErr)
-> PullError
-> ExceptT SyncErr (ResourceT IO) EntityChunk
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> ExceptT SyncErr (ResourceT IO) EntityChunk)
-> PullError -> ExceptT SyncErr (ResourceT IO) EntityChunk
forall a b. (a -> b) -> a -> b
$ DownloadEntitiesError -> PullError
SyncV2.PullError'DownloadEntities DownloadEntitiesError
err
      SyncV2.InitialC {} -> SyncErr -> ExceptT SyncErr (ResourceT IO) EntityChunk
forall a. SyncErr -> ExceptT SyncErr (ResourceT IO) a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (SyncErr -> ExceptT SyncErr (ResourceT IO) EntityChunk)
-> (PullError -> SyncErr)
-> PullError
-> ExceptT SyncErr (ResourceT IO) EntityChunk
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PullError -> SyncErr
forall e. e -> SyncError e
SyncError (PullError -> ExceptT SyncErr (ResourceT IO) EntityChunk)
-> PullError -> ExceptT SyncErr (ResourceT IO) EntityChunk
forall a b. (a -> b) -> a -> b
$ SyncError -> PullError
SyncV2.PullError'Sync SyncError
SyncV2.SyncErrorMisplacedInitialChunk

------------------------------------------------------------------------------------------------------------------------
-- Causal Dependency negotiation
------------------------------------------------------------------------------------------------------------------------

httpStreamCausalDependencies ::
  forall r.
  Auth.AuthenticatedHttpClient ->
  Servant.BaseUrl ->
  SyncV2.CausalDependenciesRequest ->
  (Stream () SyncV2.CausalDependenciesChunk -> StreamM r) ->
  StreamM r
httpStreamCausalDependencies :: forall r.
AuthenticatedHttpClient
-> BaseUrl
-> CausalDependenciesRequest
-> (Stream () CausalDependenciesChunk -> StreamM r)
-> StreamM r
httpStreamCausalDependencies (Auth.AuthenticatedHttpClient Manager
httpClient) BaseUrl
unisonShareUrl CausalDependenciesRequest
req Stream () CausalDependenciesChunk -> StreamM r
callback = do
  let clientEnv :: ClientEnv
clientEnv =
        (Manager -> BaseUrl -> ClientEnv
Servant.mkClientEnv Manager
httpClient BaseUrl
unisonShareUrl)
          { Servant.makeClientRequest = \BaseUrl
url Request
request ->
              -- Disable client-side timeouts
              (BaseUrl -> Request -> IO Request
Servant.defaultMakeClientRequest BaseUrl
url Request
request)
                IO Request -> (Request -> Request) -> IO Request
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \Request
r ->
                  Request
r
                    { Http.Client.responseTimeout = Http.Client.responseTimeoutNone
                    }
          }
  (CausalDependenciesRequest
-> ClientM (SourceIO (CBORStream CausalDependenciesChunk))
causalDependenciesStreamClientM CausalDependenciesRequest
req) ClientM (SourceIO (CBORStream CausalDependenciesChunk))
-> (ClientM (SourceIO (CBORStream CausalDependenciesChunk))
    -> StreamM r)
-> StreamM r
forall a b. a -> (a -> b) -> b
& ClientEnv
-> (Stream () CausalDependenciesChunk -> StreamM r)
-> ClientM (SourceIO (CBORStream CausalDependenciesChunk))
-> StreamM r
forall r chunk.
Serialise chunk =>
ClientEnv
-> (Stream () chunk -> StreamM r)
-> ClientM (SourceIO (CBORStream chunk))
-> StreamM r
withConduit ClientEnv
clientEnv Stream () CausalDependenciesChunk -> StreamM r
callback

-- | Ask Share for the dependencies of a given hash jwt,
-- then filter them to get the set of causals which we have and don't need sent.
negotiateKnownCausals ::
  -- | The Unison Share URL.
  Servant.BaseUrl ->
  -- | The branch to download from.
  SyncV2.BranchRef ->
  -- | The hash to download.
  Share.HashJWT ->
  Cli (Either (SyncError SyncV2.PullError) (Set Hash32))
negotiateKnownCausals :: BaseUrl
-> BranchRef -> HashJWT -> Cli (Either SyncErr (Set Hash32))
negotiateKnownCausals BaseUrl
unisonShareUrl BranchRef
branchRef HashJWT
hashJwt = do
  Cli.Env {AuthenticatedHttpClient
$sel:authHTTPClient:Env :: Env -> AuthenticatedHttpClient
authHTTPClient :: AuthenticatedHttpClient
authHTTPClient, Codebase IO Symbol Ann
$sel:codebase:Env :: Env -> Codebase IO Symbol Ann
codebase :: Codebase IO Symbol Ann
codebase} <- Cli Env
forall r (m :: * -> *). MonadReader r m => m r
ask
  IO () -> Cli ()
forall a. IO a -> Cli a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Cli ()) -> IO () -> Cli ()
forall a b. (a -> b) -> a -> b
$ Handle -> Text -> IO ()
Text.hPutStrLn Handle
IO.stderr (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"  🔎 Identifying missing entities..."
  FilePath
-> Cli (Either SyncErr (Set Hash32))
-> Cli (Either SyncErr (Set Hash32))
forall (m :: * -> *) a. MonadIO m => FilePath -> m a -> m a
Timing.time FilePath
"Causal Negotiation" (Cli (Either SyncErr (Set Hash32))
 -> Cli (Either SyncErr (Set Hash32)))
-> Cli (Either SyncErr (Set Hash32))
-> Cli (Either SyncErr (Set Hash32))
forall a b. (a -> b) -> a -> b
$ do
    IO (Either SyncErr (Set Hash32))
-> Cli (Either SyncErr (Set Hash32))
forall a. IO a -> Cli a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either SyncErr (Set Hash32))
 -> Cli (Either SyncErr (Set Hash32)))
-> (ExceptT SyncErr (ResourceT IO) (Set Hash32)
    -> IO (Either SyncErr (Set Hash32)))
-> ExceptT SyncErr (ResourceT IO) (Set Hash32)
-> Cli (Either SyncErr (Set Hash32))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ResourceT IO (Either SyncErr (Set Hash32))
-> IO (Either SyncErr (Set Hash32))
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
C.runResourceT (ResourceT IO (Either SyncErr (Set Hash32))
 -> IO (Either SyncErr (Set Hash32)))
-> (ExceptT SyncErr (ResourceT IO) (Set Hash32)
    -> ResourceT IO (Either SyncErr (Set Hash32)))
-> ExceptT SyncErr (ResourceT IO) (Set Hash32)
-> IO (Either SyncErr (Set Hash32))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExceptT SyncErr (ResourceT IO) (Set Hash32)
-> ResourceT IO (Either SyncErr (Set Hash32))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT SyncErr (ResourceT IO) (Set Hash32)
 -> Cli (Either SyncErr (Set Hash32)))
-> ExceptT SyncErr (ResourceT IO) (Set Hash32)
-> Cli (Either SyncErr (Set Hash32))
forall a b. (a -> b) -> a -> b
$ AuthenticatedHttpClient
-> BaseUrl
-> CausalDependenciesRequest
-> (Stream () CausalDependenciesChunk
    -> ExceptT SyncErr (ResourceT IO) (Set Hash32))
-> ExceptT SyncErr (ResourceT IO) (Set Hash32)
forall r.
AuthenticatedHttpClient
-> BaseUrl
-> CausalDependenciesRequest
-> (Stream () CausalDependenciesChunk -> StreamM r)
-> StreamM r
httpStreamCausalDependencies
      AuthenticatedHttpClient
authHTTPClient
      BaseUrl
unisonShareUrl
      SyncV2.CausalDependenciesRequest {BranchRef
branchRef :: BranchRef
$sel:branchRef:CausalDependenciesRequest :: BranchRef
branchRef, $sel:rootCausal:CausalDependenciesRequest :: HashJWT
rootCausal = HashJWT
hashJwt}
      \Stream () CausalDependenciesChunk
stream -> do
        [Hash32] -> Set Hash32
forall a. Ord a => [a] -> Set a
Set.fromList ([Hash32] -> Set Hash32)
-> ExceptT SyncErr (ResourceT IO) [Hash32]
-> ExceptT SyncErr (ResourceT IO) (Set Hash32)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ConduitT () Void (ExceptT SyncErr (ResourceT IO)) [Hash32]
-> ExceptT SyncErr (ResourceT IO) [Hash32]
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
C.runConduit (Stream () CausalDependenciesChunk
stream Stream () CausalDependenciesChunk
-> ConduitT
     CausalDependenciesChunk
     Void
     (ExceptT SyncErr (ResourceT IO))
     [Hash32]
-> ConduitT () Void (ExceptT SyncErr (ResourceT IO)) [Hash32]
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| (CausalDependenciesChunk -> (Hash32, DependencyType))
-> ConduitT
     CausalDependenciesChunk
     (Hash32, DependencyType)
     (ExceptT SyncErr (ResourceT IO))
     ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
C.map CausalDependenciesChunk -> (Hash32, DependencyType)
unpack ConduitT
  CausalDependenciesChunk
  (Hash32, DependencyType)
  (ExceptT SyncErr (ResourceT IO))
  ()
-> ConduitT
     (Hash32, DependencyType)
     Void
     (ExceptT SyncErr (ResourceT IO))
     [Hash32]
-> ConduitT
     CausalDependenciesChunk
     Void
     (ExceptT SyncErr (ResourceT IO))
     [Hash32]
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| Codebase IO Symbol Ann -> Stream (Hash32, DependencyType) Hash32
forall v a.
Codebase IO v a -> Stream (Hash32, DependencyType) Hash32
findKnownDeps Codebase IO Symbol Ann
codebase Stream (Hash32, DependencyType) Hash32
-> ConduitT Hash32 Void (ExceptT SyncErr (ResourceT IO)) [Hash32]
-> ConduitT
     (Hash32, DependencyType)
     Void
     (ExceptT SyncErr (ResourceT IO))
     [Hash32]
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
C..| ConduitT Hash32 Void (ExceptT SyncErr (ResourceT IO)) [Hash32]
forall (m :: * -> *) a o. Monad m => ConduitT a o m [a]
C.sinkList)
  where
    -- Go through the dependencies of the remote root from top-down, yielding all causal hashes that we already
    -- have until we find one in the causal spine we already have, then yield that one and stop since we'll implicitly
    -- have all of its dependencies.
    findKnownDeps :: Codebase.Codebase IO v a -> Stream (Hash32, DependencyType) Hash32
    findKnownDeps :: forall v a.
Codebase IO v a -> Stream (Hash32, DependencyType) Hash32
findKnownDeps Codebase IO v a
codebase = do
      ConduitT
  (Hash32, DependencyType)
  Hash32
  (ExceptT SyncErr (ResourceT IO))
  (Maybe (Hash32, DependencyType))
forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
C.await ConduitT
  (Hash32, DependencyType)
  Hash32
  (ExceptT SyncErr (ResourceT IO))
  (Maybe (Hash32, DependencyType))
-> (Maybe (Hash32, DependencyType)
    -> Stream (Hash32, DependencyType) Hash32)
-> Stream (Hash32, DependencyType) Hash32
forall a b.
ConduitT
  (Hash32, DependencyType) Hash32 (ExceptT SyncErr (ResourceT IO)) a
-> (a
    -> ConduitT
         (Hash32, DependencyType) Hash32 (ExceptT SyncErr (ResourceT IO)) b)
-> ConduitT
     (Hash32, DependencyType) Hash32 (ExceptT SyncErr (ResourceT IO)) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Just (Hash32
hash, DependencyType
LibDependency) -> do
          -- We yield all lib dependencies we have, it's possible we don't have any of the causal spine in common, but _do_ have
          -- some of the libraries we can still save a lot of work.
          ConduitT
  (Hash32, DependencyType)
  Hash32
  (ExceptT SyncErr (ResourceT IO))
  Bool
-> Stream (Hash32, DependencyType) Hash32
-> Stream (Hash32, DependencyType) Hash32
forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
whenM (ExceptT SyncErr (ResourceT IO) Bool
-> ConduitT
     (Hash32, DependencyType)
     Hash32
     (ExceptT SyncErr (ResourceT IO))
     Bool
forall (m :: * -> *) a.
Monad m =>
m a -> ConduitT (Hash32, DependencyType) Hash32 m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ExceptT SyncErr (ResourceT IO) Bool
 -> ConduitT
      (Hash32, DependencyType)
      Hash32
      (ExceptT SyncErr (ResourceT IO))
      Bool)
-> ExceptT SyncErr (ResourceT IO) Bool
-> ConduitT
     (Hash32, DependencyType)
     Hash32
     (ExceptT SyncErr (ResourceT IO))
     Bool
forall a b. (a -> b) -> a -> b
$ Codebase IO v a -> Hash32 -> ExceptT SyncErr (ResourceT IO) Bool
forall v a.
Codebase IO v a -> Hash32 -> ExceptT SyncErr (ResourceT IO) Bool
haveCausalHash Codebase IO v a
codebase Hash32
hash) (Hash32 -> Stream (Hash32, DependencyType) Hash32
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield Hash32
hash)
          -- We continue regardless.
          Codebase IO v a -> Stream (Hash32, DependencyType) Hash32
forall v a.
Codebase IO v a -> Stream (Hash32, DependencyType) Hash32
findKnownDeps Codebase IO v a
codebase
        Just (Hash32
hash, DependencyType
CausalSpineDependency) -> do
          ExceptT SyncErr (ResourceT IO) Bool
-> ConduitT
     (Hash32, DependencyType)
     Hash32
     (ExceptT SyncErr (ResourceT IO))
     Bool
forall (m :: * -> *) a.
Monad m =>
m a -> ConduitT (Hash32, DependencyType) Hash32 m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Codebase IO v a -> Hash32 -> ExceptT SyncErr (ResourceT IO) Bool
forall v a.
Codebase IO v a -> Hash32 -> ExceptT SyncErr (ResourceT IO) Bool
haveCausalHash Codebase IO v a
codebase Hash32
hash) ConduitT
  (Hash32, DependencyType)
  Hash32
  (ExceptT SyncErr (ResourceT IO))
  Bool
-> (Bool -> Stream (Hash32, DependencyType) Hash32)
-> Stream (Hash32, DependencyType) Hash32
forall a b.
ConduitT
  (Hash32, DependencyType) Hash32 (ExceptT SyncErr (ResourceT IO)) a
-> (a
    -> ConduitT
         (Hash32, DependencyType) Hash32 (ExceptT SyncErr (ResourceT IO)) b)
-> ConduitT
     (Hash32, DependencyType) Hash32 (ExceptT SyncErr (ResourceT IO)) b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
            Bool
True -> do
              -- If we find a causal hash we have in the spine, we don't need to look further,
              -- we can pass it on, then hang up the stream.
              Hash32 -> Stream (Hash32, DependencyType) Hash32
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield Hash32
hash
            Bool
False -> do
              -- Otherwise we keep looking, maybe we'll have one further in.
              Codebase IO v a -> Stream (Hash32, DependencyType) Hash32
forall v a.
Codebase IO v a -> Stream (Hash32, DependencyType) Hash32
findKnownDeps Codebase IO v a
codebase
        Maybe (Hash32, DependencyType)
Nothing -> () -> Stream (Hash32, DependencyType) Hash32
forall a.
a
-> ConduitT
     (Hash32, DependencyType) Hash32 (ExceptT SyncErr (ResourceT IO)) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    unpack :: SyncV2.CausalDependenciesChunk -> (Hash32, DependencyType)
    unpack :: CausalDependenciesChunk -> (Hash32, DependencyType)
unpack = \case
      SyncV2.CausalHashDepC {Hash32
causalHash :: Hash32
$sel:causalHash:CausalHashDepC :: CausalDependenciesChunk -> Hash32
causalHash, DependencyType
dependencyType :: DependencyType
$sel:dependencyType:CausalHashDepC :: CausalDependenciesChunk -> DependencyType
dependencyType} -> (Hash32
causalHash, DependencyType
dependencyType)
    haveCausalHash :: Codebase.Codebase IO v a -> Hash32 -> StreamM Bool
    haveCausalHash :: forall v a.
Codebase IO v a -> Hash32 -> ExceptT SyncErr (ResourceT IO) Bool
haveCausalHash Codebase IO v a
codebase Hash32
causalHash = do
      IO Bool -> ExceptT SyncErr (ResourceT IO) Bool
forall a. IO a -> ExceptT SyncErr (ResourceT IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> ExceptT SyncErr (ResourceT IO) Bool)
-> IO Bool -> ExceptT SyncErr (ResourceT IO) Bool
forall a b. (a -> b) -> a -> b
$ Codebase IO v a -> Transaction Bool -> IO Bool
forall (m :: * -> *) v a b.
MonadIO m =>
Codebase m v a -> Transaction b -> m b
Codebase.runTransaction Codebase IO v a
codebase do
        Hash32 -> Transaction Bool
Q.causalExistsByHash32 Hash32
causalHash

------------------------------------------------------------------------------------------------------------------------
-- Progress Tracking
------------------------------------------------------------------------------------------------------------------------

counterProgress :: (MonadIO m, MonadUnliftIO n) => (Int -> Text) -> ((Int -> m ()) -> n a) -> n a
counterProgress :: forall (m :: * -> *) (n :: * -> *) a.
(MonadIO m, MonadUnliftIO n) =>
(Int -> Text) -> ((Int -> m ()) -> n a) -> n a
counterProgress Int -> Text
msgBuilder (Int -> m ()) -> n a
action = do
  TVar Int
counterVar <- Int -> n (TVar Int)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
IO.newTVarIO (Int
0 :: Int)
  ((forall a. n a -> IO a) -> IO a) -> n a
forall b. ((forall a. n a -> IO a) -> IO b) -> n b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
IO.withRunInIO \forall a. n a -> IO a
toIO -> do
    IO a -> IO a
forall (m :: * -> *) a. (MonadIO m, MonadMask m) => m a -> m a
Console.Regions.displayConsoleRegions do
      RegionLayout -> (ConsoleRegion -> IO a) -> IO a
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
RegionLayout -> (ConsoleRegion -> m a) -> m a
Console.Regions.withConsoleRegion RegionLayout
Console.Regions.Linear \ConsoleRegion
region -> do
        ConsoleRegion -> STM Text -> IO ()
forall v (m :: * -> *).
(ToRegionContent v, LiftRegion m) =>
ConsoleRegion -> v -> m ()
Console.Regions.setConsoleRegion ConsoleRegion
region do
          Int
num <- TVar Int -> STM Int
forall a. TVar a -> STM a
IO.readTVar TVar Int
counterVar
          pure $ Int -> Text
msgBuilder Int
num
        n a -> IO a
forall a. n a -> IO a
toIO (n a -> IO a) -> n a -> IO a
forall a b. (a -> b) -> a -> b
$ (Int -> m ()) -> n a
action ((Int -> m ()) -> n a) -> (Int -> m ()) -> n a
forall a b. (a -> b) -> a -> b
$ \Int
i -> do
          IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
IO.atomically (TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
IO.modifyTVar' TVar Int
counterVar (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
i))

-- | Track how many entities have been downloaded using a counter stream.
withStreamProgressCallback :: (MonadIO m, MonadUnliftIO n) => Maybe Int -> (ConduitT i i m () -> n a) -> n a
withStreamProgressCallback :: forall (m :: * -> *) (n :: * -> *) i a.
(MonadIO m, MonadUnliftIO n) =>
Maybe Int -> (ConduitT i i m () -> n a) -> n a
withStreamProgressCallback Maybe Int
total ConduitT i i m () -> n a
action = do
  let msg :: Int -> Text
msg Int
n = Text
"\n  📦 Unpacked  " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Int -> Text
forall a. Show a => a -> Text
tShow Int
n Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> (Int -> Text) -> Maybe Int -> Text
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Text
"" (\Int
total -> Text
" / " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Int -> Text
forall a. Show a => a -> Text
tShow Int
total) Maybe Int
total Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" entities...\n\n"
  let action' :: (Int -> m ()) -> n a
action' Int -> m ()
f = ConduitT i i m () -> n a
action ((i -> m ()) -> ConduitT i i m ()
forall (m :: * -> *) a. Monad m => (a -> m ()) -> ConduitT a a m ()
C.iterM \i
_i -> Int -> m ()
f Int
1)
  (Int -> Text) -> ((Int -> m ()) -> n a) -> n a
forall (m :: * -> *) (n :: * -> *) a.
(MonadIO m, MonadUnliftIO n) =>
(Int -> Text) -> ((Int -> m ()) -> n a) -> n a
counterProgress Int -> Text
msg (Int -> m ()) -> n a
action'

-- | Track how many entities have been saved.
withEntitySavingCallback :: (MonadUnliftIO m) => Maybe Int -> ((Int -> m ()) -> m a) -> m a
withEntitySavingCallback :: forall (m :: * -> *) a.
MonadUnliftIO m =>
Maybe Int -> ((Int -> m ()) -> m a) -> m a
withEntitySavingCallback Maybe Int
total (Int -> m ()) -> m a
action = do
  let msg :: Int -> Text
msg Int
n = Text
"\n  💾 Saved  " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Int -> Text
forall a. Show a => a -> Text
tShow Int
n Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> (Int -> Text) -> Maybe Int -> Text
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Text
"" (\Int
total -> Text
" / " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Int -> Text
forall a. Show a => a -> Text
tShow Int
total) Maybe Int
total Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" new entities...\n\n"
  (Int -> Text) -> ((Int -> m ()) -> m a) -> m a
forall (m :: * -> *) (n :: * -> *) a.
(MonadIO m, MonadUnliftIO n) =>
(Int -> Text) -> ((Int -> m ()) -> n a) -> n a
counterProgress Int -> Text
msg (Int -> m ()) -> m a
action

-- | Track how many entities have been loaded.
withEntityLoadingCallback :: (MonadUnliftIO m) => ((Int -> m ()) -> m a) -> m a
withEntityLoadingCallback :: forall (m :: * -> *) a.
MonadUnliftIO m =>
((Int -> m ()) -> m a) -> m a
withEntityLoadingCallback (Int -> m ()) -> m a
action = do
  let msg :: a -> Text
msg a
n = Text
"\n  📦 Unpacked  " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> a -> Text
forall a. Show a => a -> Text
tShow a
n Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" entities...\n\n"
  (Int -> Text) -> ((Int -> m ()) -> m a) -> m a
forall (m :: * -> *) (n :: * -> *) a.
(MonadIO m, MonadUnliftIO n) =>
(Int -> Text) -> ((Int -> m ()) -> n a) -> n a
counterProgress Int -> Text
forall a. Show a => a -> Text
msg (Int -> m ()) -> m a
action