module Unison.Share.HistoryComments
( uploadHistoryComments,
downloadHistoryComments,
)
where
import Control.Concurrent.STM.TBMQueue (TBMQueue, closeTBMQueue, newTBMQueueIO, readTBMQueue, writeTBMQueue)
import Control.Monad.Reader
import Control.Monad.Trans.Maybe (mapMaybeT)
import Data.List.NonEmpty qualified as NEL
import Data.Monoid (All (..))
import Data.Set qualified as Set
import Data.Set.NonEmpty qualified as NESet
import Data.Text qualified as Text
import Data.Void
import Ki qualified
import Servant.API
import System.IO.Unsafe (unsafePerformIO)
import U.Codebase.Sqlite.Queries qualified as Q
import Unison.Auth.Tokens (newTokenProvider)
import Unison.Cli.Monad
import Unison.Cli.Monad qualified as Cli
import Unison.Codebase qualified as Codebase
import Unison.Debug qualified as Debug
import Unison.Hash (Hash)
import Unison.Hash32 (Hash32)
import Unison.Hash32 qualified as Hash32
import Unison.HashTags
import Unison.HistoryComment qualified as HC
import Unison.KeyThumbprint (KeyThumbprint (KeyThumbprint))
import Unison.Prelude
import Unison.Server.HistoryComments.Types
import Unison.Server.HistoryComments.Types qualified as Share
import Unison.Share.Codeserver qualified as Codeserver
import Unison.Sqlite qualified as Sqlite
import Unison.Sync.Types (RepoInfo)
import Unison.Util.Monoid (foldMapM)
import Unison.Util.Websockets
import UnliftIO.Environment (lookupEnv)
import UnliftIO.STM
msgBufferSize :: Int
msgBufferSize :: Int
msgBufferSize = Int
20
syncHistoryCommentsEnvKey :: String
= String
"UNISON_SYNC_HISTORY_COMMENTS"
shouldSyncHistoryComments :: Bool
= IO Bool -> Bool
forall a. IO a -> a
unsafePerformIO (IO Bool -> Bool) -> IO Bool -> Bool
forall a b. (a -> b) -> a -> b
$ do
String -> IO (Maybe String)
forall (m :: * -> *). MonadIO m => String -> m (Maybe String)
lookupEnv String
syncHistoryCommentsEnvKey IO (Maybe String) -> (Maybe String -> Bool) -> IO Bool
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
Just String
"true" -> Bool
True
Maybe String
_ -> Bool
False
{-# NOINLINE shouldSyncHistoryComments #-}
uploadHistoryComments ::
Hash32 ->
Codeserver.CodeserverURI ->
RepoInfo ->
Cli ()
Hash32
rootCausalHash32 CodeserverURI
codeserver RepoInfo
repoInfo
| Bool -> Bool
not Bool
shouldSyncHistoryComments = () -> Cli ()
forall a. a -> Cli a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
| Bool
otherwise = do
Cli.Env {Codebase IO Symbol Ann
codebase :: Codebase IO Symbol Ann
$sel:codebase:Env :: Env -> Codebase IO Symbol Ann
codebase, CredentialManager
credentialManager :: CredentialManager
$sel:credentialManager:Env :: Env -> CredentialManager
credentialManager} <- Cli Env
forall r (m :: * -> *). MonadReader r m => m r
ask
let path :: String
path = String
"/ucm/v1/history-comments/upload?branchRef=" String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Text -> String
Text.unpack (RepoInfo -> Text
forall a. ToHttpApiData a => a -> Text
toQueryParam RepoInfo
repoInfo)
let tokenProvider :: TokenProvider
tokenProvider = CredentialManager -> TokenProvider
newTokenProvider CredentialManager
credentialManager
Either
ConnectionException
((),
[MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk])
result <- IO
(Either
ConnectionException
((),
[MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk]))
-> Cli
(Either
ConnectionException
((),
[MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk]))
forall a. IO a -> Cli a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO
(Either
ConnectionException
((),
[MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk]))
-> Cli
(Either
ConnectionException
((),
[MsgOrError
UploadCommentsResponse HistoryCommentDownloaderChunk])))
-> IO
(Either
ConnectionException
((),
[MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk]))
-> Cli
(Either
ConnectionException
((),
[MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk]))
forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) i o r e.
(MonadUnliftIO m, WebSocketsData i, WebSocketsData o) =>
Int
-> CodeserverURI
-> (CodeserverId -> IO (Either e Text))
-> String
-> (Queues i o -> m r)
-> m (Either ConnectionException (r, [o]))
withCodeserverWebsocket @IO @(MsgOrError Void HistoryCommentUploaderChunk) @(MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk) Int
msgBufferSize CodeserverURI
codeserver TokenProvider
tokenProvider String
path \Queues {MsgOrError Void HistoryCommentUploaderChunk -> STM Bool
send :: MsgOrError Void HistoryCommentUploaderChunk -> STM Bool
$sel:send:Queues :: forall i o. Queues i o -> i -> STM Bool
send, STM
(Maybe
(MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk))
receive :: STM
(Maybe
(MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk))
$sel:receive:Queues :: forall i o. Queues i o -> STM (Maybe o)
receive} -> (Scope -> IO ()) -> IO ()
forall a. (Scope -> IO a) -> IO a
Ki.scoped \Scope
scope -> do
TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
commentHashesToSendQ <- forall a. Int -> IO (TBMQueue a)
newTBMQueueIO @(HistoryCommentHash32, [HistoryCommentRevisionHash32]) Int
100
TBMQueue (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
commentHashesToUploadQ <- forall a. Int -> IO (TBMQueue a)
newTBMQueueIO @(Either HistoryCommentHash32 HistoryCommentRevisionHash32) Int
100
TMVar Text
errMVar <- IO (TMVar Text)
forall (m :: * -> *) a. MonadIO m => m (TMVar a)
newEmptyTMVarIO
Thread ()
_ <- Scope -> IO () -> IO (Thread ())
forall a. Scope -> IO a -> IO (Thread a)
Ki.fork Scope
scope ((MsgOrError Void HistoryCommentUploaderChunk -> STM Bool)
-> TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> IO ()
hashNotifyWorker MsgOrError Void HistoryCommentUploaderChunk -> STM Bool
send TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
commentHashesToSendQ)
Thread ()
uploaderThread <- Scope -> IO () -> IO (Thread ())
forall a. Scope -> IO a -> IO (Thread a)
Ki.fork Scope
scope (Codebase IO Symbol Ann
-> (MsgOrError Void HistoryCommentUploaderChunk -> STM Bool)
-> TBMQueue
(Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> IO ()
forall v a err.
Codebase IO v a
-> (MsgOrError err HistoryCommentUploaderChunk -> STM Bool)
-> TBMQueue
(Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> IO ()
uploaderWorker Codebase IO Symbol Ann
codebase MsgOrError Void HistoryCommentUploaderChunk -> STM Bool
send TBMQueue (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
commentHashesToUploadQ)
Thread ()
_ <- Scope -> IO () -> IO (Thread ())
forall a. Scope -> IO a -> IO (Thread a)
Ki.fork Scope
scope (STM
(Maybe
(MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk))
-> TBMQueue
(Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> TMVar Text
-> IO ()
receiverWorker STM
(Maybe
(MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk))
receive TBMQueue (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
commentHashesToUploadQ TMVar Text
errMVar)
Codebase IO Symbol Ann -> Transaction () -> IO ()
forall (m :: * -> *) v a b.
MonadIO m =>
Codebase m v a -> Transaction b -> m b
Codebase.runTransaction Codebase IO Symbol Ann
codebase (Transaction () -> IO ()) -> Transaction () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
CausalHashId
rootCausalHashId <- CausalHash -> Transaction CausalHashId
Q.expectCausalHashIdByCausalHash (CausalHash -> Transaction CausalHashId)
-> CausalHash -> Transaction CausalHashId
forall a b. (a -> b) -> a -> b
$ Hash -> CausalHash
CausalHash (Hash -> CausalHash) -> Hash -> CausalHash
forall a b. (a -> b) -> a -> b
$ Hash32 -> Hash
Hash32.toHash Hash32
rootCausalHash32
CausalHashId
-> (Transaction (Maybe (HistoryCommentId, Hash32))
-> Transaction ())
-> Transaction ()
forall r.
CausalHashId
-> (Transaction (Maybe (HistoryCommentId, Hash32))
-> Transaction r)
-> Transaction r
Q.streamHistoryCommentsForCausal CausalHashId
rootCausalHashId \Transaction (Maybe (HistoryCommentId, Hash32))
getCommentIds -> do
let loop :: Transaction ()
loop = do
Maybe ()
result <- MaybeT Transaction () -> Transaction (Maybe ())
forall (m :: * -> *) a. MaybeT m a -> m (Maybe a)
runMaybeT (MaybeT Transaction () -> Transaction (Maybe ()))
-> MaybeT Transaction () -> Transaction (Maybe ())
forall a b. (a -> b) -> a -> b
$ do
(HistoryCommentId
commentId, Hash32
commentHash32) <- Transaction (Maybe (HistoryCommentId, Hash32))
-> MaybeT Transaction (HistoryCommentId, Hash32)
forall (m :: * -> *) a. m (Maybe a) -> MaybeT m a
MaybeT (Transaction (Maybe (HistoryCommentId, Hash32))
-> MaybeT Transaction (HistoryCommentId, Hash32))
-> Transaction (Maybe (HistoryCommentId, Hash32))
-> MaybeT Transaction (HistoryCommentId, Hash32)
forall a b. (a -> b) -> a -> b
$ Transaction (Maybe (HistoryCommentId, Hash32))
getCommentIds
[Hash32]
revisionHashes <- Transaction [Hash32] -> MaybeT Transaction [Hash32]
forall (m :: * -> *) a. Monad m => m a -> MaybeT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Transaction [Hash32] -> MaybeT Transaction [Hash32])
-> Transaction [Hash32] -> MaybeT Transaction [Hash32]
forall a b. (a -> b) -> a -> b
$ HistoryCommentId -> Transaction [Hash32]
Q.commentRevisionHashes HistoryCommentId
commentId
DebugFlag -> String -> Hash32 -> MaybeT Transaction ()
forall a (m :: * -> *).
(Show a, Monad m) =>
DebugFlag -> String -> a -> m ()
Debug.debugM DebugFlag
Debug.HistoryComments String
"Queueing comment for checking" Hash32
commentHash32
Transaction () -> MaybeT Transaction ()
forall (m :: * -> *) a. Monad m => m a -> MaybeT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Transaction () -> MaybeT Transaction ())
-> (IO () -> Transaction ()) -> IO () -> MaybeT Transaction ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> Transaction ()
forall a. HasCallStack => IO a -> Transaction a
Sqlite.unsafeIO (IO () -> MaybeT Transaction ()) -> IO () -> MaybeT Transaction ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> (HistoryCommentHash32, [HistoryCommentRevisionHash32]) -> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
commentHashesToSendQ (Hash32 -> HistoryCommentHash32
HistoryCommentHash32 Hash32
commentHash32, Hash32 -> HistoryCommentRevisionHash32
HistoryCommentRevisionHash32 (Hash32 -> HistoryCommentRevisionHash32)
-> [Hash32] -> [HistoryCommentRevisionHash32]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Hash32]
revisionHashes)
case Maybe ()
result of
Just () -> Transaction ()
loop
Maybe ()
Nothing -> () -> Transaction ()
forall a. a -> Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Transaction ()
loop
STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
commentHashesToSendQ
DebugFlag -> String -> IO ()
forall (m :: * -> *). Monad m => DebugFlag -> String -> m ()
Debug.debugLogM DebugFlag
Debug.HistoryComments String
"Uploading history comments: waiting for uploader thread to finish"
STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ Thread () -> STM ()
forall a. Thread a -> STM a
Ki.await Thread ()
uploaderThread
DebugFlag -> String -> IO ()
forall (m :: * -> *). Monad m => DebugFlag -> String -> m ()
Debug.debugLogM DebugFlag
Debug.HistoryComments String
"Done; closing connection"
case Either
ConnectionException
((),
[MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk])
result of
Left ConnectionException
err -> String -> Cli ()
forall a. HasCallStack => String -> a
error (String -> Cli ()) -> String -> Cli ()
forall a b. (a -> b) -> a -> b
$ String
"uploadCommentsClient: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> ConnectionException -> String
forall a. Show a => a -> String
show ConnectionException
err
Right ((), [MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk]
_leftovers ) -> () -> Cli ()
forall a. a -> Cli a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
where
flushTBMQueue :: TBMQueue a -> STM ([a], Bool)
flushTBMQueue :: forall a. TBMQueue a -> STM ([a], Bool)
flushTBMQueue TBMQueue a
q = do
STM (Maybe a) -> STM (Maybe (Maybe a))
forall (f :: * -> *) a. Alternative f => f a -> f (Maybe a)
optional (TBMQueue a -> STM (Maybe a)
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue a
q) STM (Maybe (Maybe a))
-> (Maybe (Maybe a) -> STM ([a], Bool)) -> STM ([a], Bool)
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe (Maybe a)
Nothing -> STM ([a], Bool)
forall a. STM a
forall (f :: * -> *) a. Alternative f => f a
empty
Just Maybe a
Nothing -> do
([a], Bool) -> STM ([a], Bool)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([], Bool
True)
Just (Just a
v) -> do
([a]
vs, Bool
closed) <- TBMQueue a -> STM ([a], Bool)
forall a. TBMQueue a -> STM ([a], Bool)
flushTBMQueue TBMQueue a
q STM ([a], Bool) -> STM ([a], Bool) -> STM ([a], Bool)
forall a. STM a -> STM a -> STM a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ([a], Bool) -> STM ([a], Bool)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([], Bool
False)
([a], Bool) -> STM ([a], Bool)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a
v a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
vs, Bool
closed)
uploaderWorker ::
Codebase.Codebase IO v a ->
( MsgOrError err HistoryCommentUploaderChunk ->
STM Bool
) ->
TBMQueue (Either HistoryCommentHash32 HistoryCommentRevisionHash32) ->
IO ()
uploaderWorker :: forall v a err.
Codebase IO v a
-> (MsgOrError err HistoryCommentUploaderChunk -> STM Bool)
-> TBMQueue
(Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> IO ()
uploaderWorker Codebase IO v a
codebase MsgOrError err HistoryCommentUploaderChunk -> STM Bool
send TBMQueue (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
commentHashesToUploadQ = do
let loop :: MaybeT IO Any
loop = do
Either HistoryCommentHash32 HistoryCommentRevisionHash32
hash <- IO
(Maybe (Either HistoryCommentHash32 HistoryCommentRevisionHash32))
-> MaybeT
IO (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
forall (m :: * -> *) a. m (Maybe a) -> MaybeT m a
MaybeT (IO
(Maybe (Either HistoryCommentHash32 HistoryCommentRevisionHash32))
-> MaybeT
IO (Either HistoryCommentHash32 HistoryCommentRevisionHash32))
-> IO
(Maybe (Either HistoryCommentHash32 HistoryCommentRevisionHash32))
-> MaybeT
IO (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
forall a b. (a -> b) -> a -> b
$ STM
(Maybe (Either HistoryCommentHash32 HistoryCommentRevisionHash32))
-> IO
(Maybe (Either HistoryCommentHash32 HistoryCommentRevisionHash32))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TBMQueue (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> STM
(Maybe (Either HistoryCommentHash32 HistoryCommentRevisionHash32))
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
commentHashesToUploadQ)
(Transaction (Maybe ()) -> IO (Maybe ()))
-> MaybeT Transaction () -> MaybeT IO ()
forall (m :: * -> *) a (n :: * -> *) b.
(m (Maybe a) -> n (Maybe b)) -> MaybeT m a -> MaybeT n b
mapMaybeT (Codebase IO v a -> Transaction (Maybe ()) -> IO (Maybe ())
forall (m :: * -> *) v a b.
MonadIO m =>
Codebase m v a -> Transaction b -> m b
Codebase.runTransaction Codebase IO v a
codebase) (MaybeT Transaction () -> MaybeT IO ())
-> MaybeT Transaction () -> MaybeT IO ()
forall a b. (a -> b) -> a -> b
$ do
case Either HistoryCommentHash32 HistoryCommentRevisionHash32
hash of
Left (HistoryCommentHash32 Hash32
commentHash) -> do
DebugFlag -> String -> Hash32 -> MaybeT Transaction ()
forall a (m :: * -> *).
(Show a, Monad m) =>
DebugFlag -> String -> a -> m ()
Debug.debugM DebugFlag
Debug.HistoryComments String
"Uploading comment for hash" Hash32
commentHash
HistoryCommentId
commentId <- Transaction HistoryCommentId -> MaybeT Transaction HistoryCommentId
forall (m :: * -> *) a. Monad m => m a -> MaybeT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Transaction HistoryCommentId
-> MaybeT Transaction HistoryCommentId)
-> Transaction HistoryCommentId
-> MaybeT Transaction HistoryCommentId
forall a b. (a -> b) -> a -> b
$ Hash32 -> Transaction HistoryCommentId
Q.expectHistoryCommentIdByHash32 Hash32
commentHash
HistoryComment UTCTime KeyThumbprint Hash32 Hash32
comment <- Transaction (HistoryComment UTCTime KeyThumbprint Hash32 Hash32)
-> MaybeT
Transaction (HistoryComment UTCTime KeyThumbprint Hash32 Hash32)
forall (m :: * -> *) a. Monad m => m a -> MaybeT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Transaction (HistoryComment UTCTime KeyThumbprint Hash32 Hash32)
-> MaybeT
Transaction (HistoryComment UTCTime KeyThumbprint Hash32 Hash32))
-> Transaction (HistoryComment UTCTime KeyThumbprint Hash32 Hash32)
-> MaybeT
Transaction (HistoryComment UTCTime KeyThumbprint Hash32 Hash32)
forall a b. (a -> b) -> a -> b
$ HistoryCommentId
-> Transaction (HistoryComment UTCTime KeyThumbprint Hash32 Hash32)
Q.expectHistoryCommentById HistoryCommentId
commentId
Bool
success <- Transaction Bool -> MaybeT Transaction Bool
forall (m :: * -> *) a. Monad m => m a -> MaybeT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Transaction Bool -> MaybeT Transaction Bool)
-> Transaction Bool -> MaybeT Transaction Bool
forall a b. (a -> b) -> a -> b
$ IO Bool -> Transaction Bool
forall a. HasCallStack => IO a -> Transaction a
Sqlite.unsafeIO (IO Bool -> Transaction Bool) -> IO Bool -> Transaction Bool
forall a b. (a -> b) -> a -> b
$ STM Bool -> IO Bool
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ MsgOrError err HistoryCommentUploaderChunk -> STM Bool
send (HistoryCommentUploaderChunk
-> MsgOrError err HistoryCommentUploaderChunk
forall err a. a -> MsgOrError err a
Msg (HistoryCommentUploaderChunk
-> MsgOrError err HistoryCommentUploaderChunk)
-> HistoryCommentUploaderChunk
-> MsgOrError err HistoryCommentUploaderChunk
forall a b. (a -> b) -> a -> b
$ Either
(HistoryComment UTCTime KeyThumbprint Hash32 Hash32)
(HistoryCommentRevision Hash32 UTCTime Hash32)
-> HistoryCommentUploaderChunk
intoChunk (HistoryComment UTCTime KeyThumbprint Hash32 Hash32
-> Either
(HistoryComment UTCTime KeyThumbprint Hash32 Hash32)
(HistoryCommentRevision Hash32 UTCTime Hash32)
forall a b. a -> Either a b
Left HistoryComment UTCTime KeyThumbprint Hash32 Hash32
comment))
Bool -> MaybeT Transaction () -> MaybeT Transaction ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
success) (MaybeT Transaction () -> MaybeT Transaction ())
-> MaybeT Transaction () -> MaybeT Transaction ()
forall a b. (a -> b) -> a -> b
$ DebugFlag -> String -> MaybeT Transaction ()
forall (m :: * -> *). Monad m => DebugFlag -> String -> m ()
Debug.debugLogM DebugFlag
Debug.HistoryComments String
"Failed to send the history comment, shutting down"
Bool -> MaybeT Transaction ()
forall (f :: * -> *). Alternative f => Bool -> f ()
guard Bool
success
Right (HistoryCommentRevisionHash32 Hash32
revisionHash) -> do
HistoryCommentRevisionId
revisionId <- Transaction HistoryCommentRevisionId
-> MaybeT Transaction HistoryCommentRevisionId
forall (m :: * -> *) a. Monad m => m a -> MaybeT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Transaction HistoryCommentRevisionId
-> MaybeT Transaction HistoryCommentRevisionId)
-> Transaction HistoryCommentRevisionId
-> MaybeT Transaction HistoryCommentRevisionId
forall a b. (a -> b) -> a -> b
$ Hash32 -> Transaction HistoryCommentRevisionId
Q.expectHistoryCommentRevisionIdByHash32 Hash32
revisionHash
HistoryCommentRevision Hash32 UTCTime Hash32
revision <- Transaction (HistoryCommentRevision Hash32 UTCTime Hash32)
-> MaybeT
Transaction (HistoryCommentRevision Hash32 UTCTime Hash32)
forall (m :: * -> *) a. Monad m => m a -> MaybeT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Transaction (HistoryCommentRevision Hash32 UTCTime Hash32)
-> MaybeT
Transaction (HistoryCommentRevision Hash32 UTCTime Hash32))
-> Transaction (HistoryCommentRevision Hash32 UTCTime Hash32)
-> MaybeT
Transaction (HistoryCommentRevision Hash32 UTCTime Hash32)
forall a b. (a -> b) -> a -> b
$ HistoryCommentRevisionId
-> Transaction (HistoryCommentRevision Hash32 UTCTime Hash32)
Q.expectHistoryCommentRevisionById HistoryCommentRevisionId
revisionId
Bool
success <- Transaction Bool -> MaybeT Transaction Bool
forall (m :: * -> *) a. Monad m => m a -> MaybeT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Transaction Bool -> MaybeT Transaction Bool)
-> Transaction Bool -> MaybeT Transaction Bool
forall a b. (a -> b) -> a -> b
$ IO Bool -> Transaction Bool
forall a. HasCallStack => IO a -> Transaction a
Sqlite.unsafeIO (IO Bool -> Transaction Bool) -> IO Bool -> Transaction Bool
forall a b. (a -> b) -> a -> b
$ STM Bool -> IO Bool
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ MsgOrError err HistoryCommentUploaderChunk -> STM Bool
send (HistoryCommentUploaderChunk
-> MsgOrError err HistoryCommentUploaderChunk
forall err a. a -> MsgOrError err a
Msg (HistoryCommentUploaderChunk
-> MsgOrError err HistoryCommentUploaderChunk)
-> HistoryCommentUploaderChunk
-> MsgOrError err HistoryCommentUploaderChunk
forall a b. (a -> b) -> a -> b
$ Either
(HistoryComment UTCTime KeyThumbprint Hash32 Hash32)
(HistoryCommentRevision Hash32 UTCTime Hash32)
-> HistoryCommentUploaderChunk
intoChunk (HistoryCommentRevision Hash32 UTCTime Hash32
-> Either
(HistoryComment UTCTime KeyThumbprint Hash32 Hash32)
(HistoryCommentRevision Hash32 UTCTime Hash32)
forall a b. b -> Either a b
Right HistoryCommentRevision Hash32 UTCTime Hash32
revision))
Bool -> MaybeT Transaction () -> MaybeT Transaction ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
success) (MaybeT Transaction () -> MaybeT Transaction ())
-> MaybeT Transaction () -> MaybeT Transaction ()
forall a b. (a -> b) -> a -> b
$ DebugFlag -> String -> MaybeT Transaction ()
forall (m :: * -> *). Monad m => DebugFlag -> String -> m ()
Debug.debugLogM DebugFlag
Debug.HistoryComments String
"Failed to send history comment revision, shutting down"
Bool -> MaybeT Transaction ()
forall (f :: * -> *). Alternative f => Bool -> f ()
guard Bool
success
MaybeT IO Any
loop
IO (Maybe Any) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe Any) -> IO ())
-> (MaybeT IO Any -> IO (Maybe Any)) -> MaybeT IO Any -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MaybeT IO Any -> IO (Maybe Any)
forall (m :: * -> *) a. MaybeT m a -> m (Maybe a)
runMaybeT (MaybeT IO Any -> IO ()) -> MaybeT IO Any -> IO ()
forall a b. (a -> b) -> a -> b
$ MaybeT IO Any
loop
receiverWorker ::
STM (Maybe (MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk)) ->
TBMQueue
( Either
HistoryCommentHash32
HistoryCommentRevisionHash32
) ->
TMVar Text ->
IO ()
receiverWorker :: STM
(Maybe
(MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk))
-> TBMQueue
(Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> TMVar Text
-> IO ()
receiverWorker STM
(Maybe
(MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk))
receive TBMQueue (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
commentHashesToUploadQ TMVar Text
errMVar = do
let loop :: IO ()
loop = do
Maybe
(MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk)
msgOrError <- STM
(Maybe
(MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk))
-> IO
(Maybe
(MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically STM
(Maybe
(MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk))
receive
case Maybe
(MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk)
msgOrError of
Maybe
(MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk)
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just (Msg HistoryCommentDownloaderChunk
msg) -> case HistoryCommentDownloaderChunk
msg of
HistoryCommentDownloaderChunk
DoneCheckingHashesChunk -> do
STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBMQueue (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
commentHashesToUploadQ
IO ()
loop
RequestCommentsChunk NESet (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
comments -> do
STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ NESet (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> (Either HistoryCommentHash32 HistoryCommentRevisionHash32
-> STM ())
-> STM ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ NESet (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
comments ((Either HistoryCommentHash32 HistoryCommentRevisionHash32
-> STM ())
-> STM ())
-> (Either HistoryCommentHash32 HistoryCommentRevisionHash32
-> STM ())
-> STM ()
forall a b. (a -> b) -> a -> b
$ TBMQueue (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> Either HistoryCommentHash32 HistoryCommentRevisionHash32
-> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
commentHashesToUploadQ
IO ()
loop
Just (DeserialiseFailure Text
msg) -> do
STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar Text -> Text -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar Text
errMVar (Text -> STM ()) -> Text -> STM ()
forall a b. (a -> b) -> a -> b
$ Text
"uploadHistoryComments: deserialisation failure: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
msg
Just (UserErr UploadCommentsResponse
err) -> do
STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar Text -> Text -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar Text
errMVar (Text -> STM ()) -> Text -> STM ()
forall a b. (a -> b) -> a -> b
$ Text
"uploadHistoryComments: server error: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> UploadCommentsResponse -> Text
forall a. Show a => a -> Text
tShow UploadCommentsResponse
err
IO ()
loop
hashNotifyWorker :: (MsgOrError Void HistoryCommentUploaderChunk -> STM Bool) -> TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32]) -> IO ()
hashNotifyWorker :: (MsgOrError Void HistoryCommentUploaderChunk -> STM Bool)
-> TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> IO ()
hashNotifyWorker MsgOrError Void HistoryCommentUploaderChunk -> STM Bool
send TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
q = do
let loop :: IO ()
loop = do
Bool
isClosed <- STM Bool -> IO Bool
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ do
([(HistoryCommentHash32, [HistoryCommentRevisionHash32])]
hashesToCheck, Bool
isClosed) <- TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> STM
([(HistoryCommentHash32, [HistoryCommentRevisionHash32])], Bool)
forall a. TBMQueue a -> STM ([a], Bool)
flushTBMQueue TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
q
All Bool
sentSuccessfully <-
[(HistoryCommentHash32, [HistoryCommentRevisionHash32])]
-> Maybe
(NonEmpty (HistoryCommentHash32, [HistoryCommentRevisionHash32]))
forall a. [a] -> Maybe (NonEmpty a)
NEL.nonEmpty [(HistoryCommentHash32, [HistoryCommentRevisionHash32])]
hashesToCheck Maybe
(NonEmpty (HistoryCommentHash32, [HistoryCommentRevisionHash32]))
-> (Maybe
(NonEmpty (HistoryCommentHash32, [HistoryCommentRevisionHash32]))
-> STM All)
-> STM All
forall a b. a -> (a -> b) -> b
& (NonEmpty (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> STM All)
-> Maybe
(NonEmpty (HistoryCommentHash32, [HistoryCommentRevisionHash32]))
-> STM All
forall (m :: * -> *) (f :: * -> *) b a.
(Monad m, Foldable f, Monoid b) =>
(a -> m b) -> f a -> m b
foldMapM \NonEmpty (HistoryCommentHash32, [HistoryCommentRevisionHash32])
possiblyNewHashes -> do
DebugFlag
-> String
-> NonEmpty (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> STM ()
forall a (m :: * -> *).
(Show a, Monad m) =>
DebugFlag -> String -> a -> m ()
Debug.debugM DebugFlag
Debug.HistoryComments String
"Sending possibly new hashes:" NonEmpty (HistoryCommentHash32, [HistoryCommentRevisionHash32])
possiblyNewHashes
Bool -> All
All (Bool -> All) -> STM Bool -> STM All
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (MsgOrError Void HistoryCommentUploaderChunk -> STM Bool
send (MsgOrError Void HistoryCommentUploaderChunk -> STM Bool)
-> MsgOrError Void HistoryCommentUploaderChunk -> STM Bool
forall a b. (a -> b) -> a -> b
$ HistoryCommentUploaderChunk
-> MsgOrError Void HistoryCommentUploaderChunk
forall err a. a -> MsgOrError err a
Msg (HistoryCommentUploaderChunk
-> MsgOrError Void HistoryCommentUploaderChunk)
-> HistoryCommentUploaderChunk
-> MsgOrError Void HistoryCommentUploaderChunk
forall a b. (a -> b) -> a -> b
$ NonEmpty (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> HistoryCommentUploaderChunk
PossiblyNewHashesChunk NonEmpty (HistoryCommentHash32, [HistoryCommentRevisionHash32])
possiblyNewHashes)
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool
isClosed Bool -> Bool -> Bool
|| Bool -> Bool
not Bool
sentSuccessfully) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$
DebugFlag -> String -> STM ()
forall (m :: * -> *). Monad m => DebugFlag -> String -> m ()
Debug.debugLogM DebugFlag
Debug.HistoryComments String
"Hash notify worker: queue closed or server closed connection, no longer sending hashes"
Bool -> STM Bool
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
isClosed Bool -> Bool -> Bool
|| Bool -> Bool
not Bool
sentSuccessfully)
if Bool
isClosed
then do
IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> (STM Bool -> IO Bool) -> STM Bool -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM Bool -> IO Bool
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Bool -> IO ()) -> STM Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MsgOrError Void HistoryCommentUploaderChunk -> STM Bool
send (HistoryCommentUploaderChunk
-> MsgOrError Void HistoryCommentUploaderChunk
forall err a. a -> MsgOrError err a
Msg HistoryCommentUploaderChunk
DoneSendingHashesChunk)
else IO ()
loop
IO ()
loop
intoChunk :: Either
(HistoryComment UTCTime KeyThumbprint Hash32 Hash32)
(HistoryCommentRevision Hash32 UTCTime Hash32)
-> HistoryCommentUploaderChunk
intoChunk = \case
Left
( HC.HistoryComment
{ Text
author :: Text
$sel:author:HistoryComment :: forall createdAt thumbprint causal commentId.
HistoryComment createdAt thumbprint causal commentId -> Text
author,
UTCTime
createdAt :: UTCTime
$sel:createdAt:HistoryComment :: forall createdAt thumbprint causal commentId.
HistoryComment createdAt thumbprint causal commentId -> createdAt
createdAt,
$sel:authorThumbprint:HistoryComment :: forall createdAt thumbprint causal commentId.
HistoryComment createdAt thumbprint causal commentId -> thumbprint
authorThumbprint = KeyThumbprint Text
authorThumbprint,
Hash32
causal :: Hash32
$sel:causal:HistoryComment :: forall createdAt thumbprint causal commentId.
HistoryComment createdAt thumbprint causal commentId -> causal
causal,
$sel:commentId:HistoryComment :: forall createdAt thumbprint causal commentId.
HistoryComment createdAt thumbprint causal commentId -> commentId
commentId = Hash32
commentHash
}
) ->
HistoryComment -> HistoryCommentUploaderChunk
HistoryCommentChunk
Share.HistoryComment
{ Text
author :: Text
$sel:author:HistoryComment :: Text
author,
UTCTime
createdAt :: UTCTime
$sel:createdAt:HistoryComment :: UTCTime
createdAt,
Text
authorThumbprint :: Text
$sel:authorThumbprint:HistoryComment :: Text
authorThumbprint,
$sel:causalHash:HistoryComment :: Hash32
Share.causalHash = Hash32
causal,
Hash32
commentHash :: Hash32
$sel:commentHash:HistoryComment :: Hash32
commentHash
}
Right
( HC.HistoryCommentRevision
{ Text
subject :: Text
$sel:subject:HistoryCommentRevision :: forall revisionId createdAt comment.
HistoryCommentRevision revisionId createdAt comment -> Text
subject,
Text
content :: Text
$sel:content:HistoryCommentRevision :: forall revisionId createdAt comment.
HistoryCommentRevision revisionId createdAt comment -> Text
content,
UTCTime
createdAt :: UTCTime
$sel:createdAt:HistoryCommentRevision :: forall revisionId createdAt comment.
HistoryCommentRevision revisionId createdAt comment -> createdAt
createdAt,
$sel:comment:HistoryCommentRevision :: forall revisionId createdAt comment.
HistoryCommentRevision revisionId createdAt comment -> comment
comment = Hash32
commentHash,
Bool
isHidden :: Bool
$sel:isHidden:HistoryCommentRevision :: forall revisionId createdAt comment.
HistoryCommentRevision revisionId createdAt comment -> Bool
isHidden,
ByteString
authorSignature :: ByteString
$sel:authorSignature:HistoryCommentRevision :: forall revisionId createdAt comment.
HistoryCommentRevision revisionId createdAt comment -> ByteString
authorSignature,
Hash32
revisionId :: Hash32
$sel:revisionId:HistoryCommentRevision :: forall revisionId createdAt comment.
HistoryCommentRevision revisionId createdAt comment -> revisionId
revisionId
}
) ->
HistoryCommentRevision -> HistoryCommentUploaderChunk
HistoryCommentRevisionChunk
Share.HistoryCommentRevision
{ Text
subject :: Text
$sel:subject:HistoryCommentRevision :: Text
subject,
Text
content :: Text
$sel:content:HistoryCommentRevision :: Text
content,
UTCTime
createdAt :: UTCTime
$sel:createdAt:HistoryCommentRevision :: UTCTime
createdAt,
Bool
isHidden :: Bool
$sel:isHidden:HistoryCommentRevision :: Bool
isHidden,
ByteString
authorSignature :: ByteString
$sel:authorSignature:HistoryCommentRevision :: ByteString
authorSignature,
$sel:revisionHash:HistoryCommentRevision :: Hash32
revisionHash = Hash32
revisionId,
Hash32
commentHash :: Hash32
$sel:commentHash:HistoryCommentRevision :: Hash32
commentHash
}
fetchChunk :: (Show a) => Int -> STM (Maybe a) -> STM ([a], Bool)
fetchChunk :: forall a. Show a => Int -> STM (Maybe a) -> STM ([a], Bool)
fetchChunk Int
size STM (Maybe a)
action = do
let go :: Int -> STM ([a], Bool)
go Int
0 = ([a], Bool) -> STM ([a], Bool)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([], Bool
False)
go Int
n = do
STM (Maybe a) -> STM (Maybe (Maybe a))
forall (f :: * -> *) a. Alternative f => f a -> f (Maybe a)
optional STM (Maybe a)
action STM (Maybe (Maybe a))
-> (Maybe (Maybe a) -> STM ([a], Bool)) -> STM ([a], Bool)
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe (Maybe a)
Nothing -> do
STM ([a], Bool)
forall a. STM a
forall (f :: * -> *) a. Alternative f => f a
empty
Just Maybe a
Nothing -> do
([a], Bool) -> STM ([a], Bool)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([], Bool
True)
Just (Just a
val) -> do
DebugFlag -> String -> a -> STM ()
forall a (m :: * -> *).
(Show a, Monad m) =>
DebugFlag -> String -> a -> m ()
Debug.debugM DebugFlag
Debug.HistoryComments String
"Fetched value from queue" a
val
([a]
rest, Bool
exhausted) <- Int -> STM ([a], Bool)
go (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) STM ([a], Bool) -> STM ([a], Bool) -> STM ([a], Bool)
forall a. STM a -> STM a -> STM a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ([a], Bool) -> STM ([a], Bool)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([], Bool
False)
([a], Bool) -> STM ([a], Bool)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a
val a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
rest, Bool
exhausted)
Int -> STM ([a], Bool)
go Int
size
downloadHistoryComments ::
Codeserver.CodeserverURI ->
RepoInfo ->
Cli ()
CodeserverURI
codeserver RepoInfo
repoInfo
| Bool -> Bool
not Bool
shouldSyncHistoryComments = () -> Cli ()
forall a. a -> Cli a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
| Bool
otherwise = do
Cli.Env {Codebase IO Symbol Ann
$sel:codebase:Env :: Env -> Codebase IO Symbol Ann
codebase :: Codebase IO Symbol Ann
codebase, CredentialManager
$sel:credentialManager:Env :: Env -> CredentialManager
credentialManager :: CredentialManager
credentialManager} <- Cli Env
forall r (m :: * -> *). MonadReader r m => m r
ask
let path :: String
path = String
"/ucm/v1/history-comments/download?branchRef=" String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Text -> String
Text.unpack (RepoInfo -> Text
forall a. ToHttpApiData a => a -> Text
toQueryParam RepoInfo
repoInfo)
let tokenProvider :: TokenProvider
tokenProvider = CredentialManager -> TokenProvider
newTokenProvider CredentialManager
credentialManager
Either
ConnectionException
((),
[MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk])
result <- IO
(Either
ConnectionException
((),
[MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk]))
-> Cli
(Either
ConnectionException
((),
[MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk]))
forall a. IO a -> Cli a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO
(Either
ConnectionException
((),
[MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk]))
-> Cli
(Either
ConnectionException
((),
[MsgOrError
DownloadCommentsResponse HistoryCommentUploaderChunk])))
-> IO
(Either
ConnectionException
((),
[MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk]))
-> Cli
(Either
ConnectionException
((),
[MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk]))
forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) i o r e.
(MonadUnliftIO m, WebSocketsData i, WebSocketsData o) =>
Int
-> CodeserverURI
-> (CodeserverId -> IO (Either e Text))
-> String
-> (Queues i o -> m r)
-> m (Either ConnectionException (r, [o]))
withCodeserverWebsocket @IO @(MsgOrError Void HistoryCommentDownloaderChunk) @(MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk) Int
msgBufferSize CodeserverURI
codeserver TokenProvider
tokenProvider String
path \Queues {MsgOrError Void HistoryCommentDownloaderChunk -> STM Bool
$sel:send:Queues :: forall i o. Queues i o -> i -> STM Bool
send :: MsgOrError Void HistoryCommentDownloaderChunk -> STM Bool
send, STM
(Maybe
(MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk))
$sel:receive:Queues :: forall i o. Queues i o -> STM (Maybe o)
receive :: STM
(Maybe
(MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk))
receive} -> (Scope -> IO ()) -> IO ()
forall a. (Scope -> IO a) -> IO a
Ki.scoped \Scope
scope -> do
TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
hashesToCheckQ <- IO
(TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32]))
-> IO
(TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32]))
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO
(TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32]))
-> IO
(TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])))
-> IO
(TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32]))
-> IO
(TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32]))
forall a b. (a -> b) -> a -> b
$ forall a. Int -> IO (TBMQueue a)
newTBMQueueIO @(HistoryCommentHash32, [HistoryCommentRevisionHash32]) Int
100
TBMQueue (Either HistoryComment HistoryCommentRevision)
commentsQ <- IO (TBMQueue (Either HistoryComment HistoryCommentRevision))
-> IO (TBMQueue (Either HistoryComment HistoryCommentRevision))
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TBMQueue (Either HistoryComment HistoryCommentRevision))
-> IO (TBMQueue (Either HistoryComment HistoryCommentRevision)))
-> IO (TBMQueue (Either HistoryComment HistoryCommentRevision))
-> IO (TBMQueue (Either HistoryComment HistoryCommentRevision))
forall a b. (a -> b) -> a -> b
$ Int -> IO (TBMQueue (Either HistoryComment HistoryCommentRevision))
forall a. Int -> IO (TBMQueue a)
newTBMQueueIO Int
100
TMVar Text
errMVar <- IO (TMVar Text) -> IO (TMVar Text)
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TMVar Text)
forall (m :: * -> *) a. MonadIO m => m (TMVar a)
newEmptyTMVarIO
Thread ()
_receiverThread <- IO (Thread ()) -> IO (Thread ())
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Thread ()) -> IO (Thread ()))
-> IO (Thread ()) -> IO (Thread ())
forall a b. (a -> b) -> a -> b
$ Scope -> IO () -> IO (Thread ())
forall a. Scope -> IO a -> IO (Thread a)
Ki.fork Scope
scope (IO () -> IO (Thread ())) -> IO () -> IO (Thread ())
forall a b. (a -> b) -> a -> b
$ STM
(Maybe
(MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk))
-> TMVar Text
-> TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> TBMQueue (Either HistoryComment HistoryCommentRevision)
-> IO ()
receiverWorker STM
(Maybe
(MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk))
receive TMVar Text
errMVar TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
hashesToCheckQ TBMQueue (Either HistoryComment HistoryCommentRevision)
commentsQ
Thread ()
inserterThread <- IO (Thread ()) -> IO (Thread ())
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Thread ()) -> IO (Thread ()))
-> IO (Thread ()) -> IO (Thread ())
forall a b. (a -> b) -> a -> b
$ Scope -> IO () -> IO (Thread ())
forall a. Scope -> IO a -> IO (Thread a)
Ki.fork Scope
scope (IO () -> IO (Thread ())) -> IO () -> IO (Thread ())
forall a b. (a -> b) -> a -> b
$ Codebase IO Symbol Ann
-> TBMQueue (Either HistoryComment HistoryCommentRevision) -> IO ()
forall v a.
Codebase IO v a
-> TBMQueue (Either HistoryComment HistoryCommentRevision) -> IO ()
inserterWorker Codebase IO Symbol Ann
codebase TBMQueue (Either HistoryComment HistoryCommentRevision)
commentsQ
Thread ()
_hashCheckingThread <- IO (Thread ()) -> IO (Thread ())
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Thread ()) -> IO (Thread ()))
-> IO (Thread ()) -> IO (Thread ())
forall a b. (a -> b) -> a -> b
$ Scope -> IO () -> IO (Thread ())
forall a. Scope -> IO a -> IO (Thread a)
Ki.fork Scope
scope (IO () -> IO (Thread ())) -> IO () -> IO (Thread ())
forall a b. (a -> b) -> a -> b
$ Codebase IO Symbol Ann
-> (MsgOrError Void HistoryCommentDownloaderChunk -> STM Bool)
-> TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> IO ()
forall v a err.
Codebase IO v a
-> (MsgOrError err HistoryCommentDownloaderChunk -> STM Bool)
-> TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> IO ()
hashCheckingWorker Codebase IO Symbol Ann
codebase MsgOrError Void HistoryCommentDownloaderChunk -> STM Bool
send TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
hashesToCheckQ
DebugFlag -> String -> IO ()
forall (m :: * -> *). Monad m => DebugFlag -> String -> m ()
Debug.debugLogM DebugFlag
Debug.HistoryComments String
"Downloading history comments: waiting for inserter thread to finish"
STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ Thread () -> STM ()
forall a. Thread a -> STM a
Ki.await Thread ()
inserterThread
case Either
ConnectionException
((),
[MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk])
result of
Left ConnectionException
connException -> String -> Cli ()
forall a. HasCallStack => String -> a
error (String -> Cli ()) -> String -> Cli ()
forall a b. (a -> b) -> a -> b
$ String
"downloadHistoryComments: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> ConnectionException -> String
forall a. Show a => a -> String
show ConnectionException
connException
Right ((), [MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk]
_leftovers) -> () -> Cli ()
forall a. a -> Cli a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
where
inserterWorker ::
Codebase.Codebase IO v a ->
TBMQueue (Either HistoryComment HistoryCommentRevision) ->
IO ()
inserterWorker :: forall v a.
Codebase IO v a
-> TBMQueue (Either HistoryComment HistoryCommentRevision) -> IO ()
inserterWorker Codebase IO v a
codebase TBMQueue (Either HistoryComment HistoryCommentRevision)
commentsQ = do
let loop :: IO ()
loop = do
([Either HistoryComment HistoryCommentRevision]
chunk, Bool
closed) <- STM ([Either HistoryComment HistoryCommentRevision], Bool)
-> IO ([Either HistoryComment HistoryCommentRevision], Bool)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (Int
-> STM (Maybe (Either HistoryComment HistoryCommentRevision))
-> STM ([Either HistoryComment HistoryCommentRevision], Bool)
forall a. Show a => Int -> STM (Maybe a) -> STM ([a], Bool)
fetchChunk Int
insertCommentBatchSize (TBMQueue (Either HistoryComment HistoryCommentRevision)
-> STM (Maybe (Either HistoryComment HistoryCommentRevision))
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue (Either HistoryComment HistoryCommentRevision)
commentsQ))
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not ([Either HistoryComment HistoryCommentRevision] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Either HistoryComment HistoryCommentRevision]
chunk)) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
DebugFlag -> String -> Int -> IO ()
forall a (m :: * -> *).
(Show a, Monad m) =>
DebugFlag -> String -> a -> m ()
Debug.debugM DebugFlag
Debug.HistoryComments String
"Inserting comments chunk of size" ([Either HistoryComment HistoryCommentRevision] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Either HistoryComment HistoryCommentRevision]
chunk)
Codebase IO v a -> Transaction () -> IO ()
forall (m :: * -> *) v a b.
MonadIO m =>
Codebase m v a -> Transaction b -> m b
Codebase.runTransaction Codebase IO v a
codebase (Transaction () -> IO ()) -> Transaction () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
[Either HistoryComment HistoryCommentRevision]
-> (Either HistoryComment HistoryCommentRevision -> Transaction ())
-> Transaction ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [Either HistoryComment HistoryCommentRevision]
chunk \case
Left
HistoryComment
{ Text
$sel:author:HistoryComment :: HistoryComment -> Text
author :: Text
author,
UTCTime
$sel:createdAt:HistoryComment :: HistoryComment -> UTCTime
createdAt :: UTCTime
createdAt,
Text
$sel:authorThumbprint:HistoryComment :: HistoryComment -> Text
authorThumbprint :: Text
authorThumbprint,
Hash32
$sel:causalHash:HistoryComment :: HistoryComment -> Hash32
causalHash :: Hash32
causalHash,
Hash32
$sel:commentHash:HistoryComment :: HistoryComment -> Hash32
commentHash :: Hash32
commentHash
} -> do
CausalHashId
causalHashId <- Hash32 -> Transaction CausalHashId
Q.expectCausalHashIdForHash32 Hash32
causalHash
HistoryComment
UTCTime KeyThumbprint CausalHashId HistoryCommentHash
-> Transaction ()
Q.insertHistoryComment
HC.HistoryComment
{ Text
$sel:author:HistoryComment :: Text
author :: Text
author,
UTCTime
$sel:createdAt:HistoryComment :: UTCTime
createdAt :: UTCTime
createdAt,
$sel:authorThumbprint:HistoryComment :: KeyThumbprint
authorThumbprint = Text -> KeyThumbprint
KeyThumbprint Text
authorThumbprint,
$sel:causal:HistoryComment :: CausalHashId
causal = CausalHashId
causalHashId,
$sel:commentId:HistoryComment :: HistoryCommentHash
commentId = Hash -> HistoryCommentHash
HistoryCommentHash (Hash -> HistoryCommentHash) -> Hash -> HistoryCommentHash
forall a b. (a -> b) -> a -> b
$ forall target source. From source target => source -> target
into @Hash Hash32
commentHash
}
Right
HistoryCommentRevision
{ Text
$sel:subject:HistoryCommentRevision :: HistoryCommentRevision -> Text
subject :: Text
subject,
Text
$sel:content:HistoryCommentRevision :: HistoryCommentRevision -> Text
content :: Text
content,
UTCTime
$sel:createdAt:HistoryCommentRevision :: HistoryCommentRevision -> UTCTime
createdAt :: UTCTime
createdAt,
Bool
$sel:isHidden:HistoryCommentRevision :: HistoryCommentRevision -> Bool
isHidden :: Bool
isHidden,
ByteString
$sel:authorSignature:HistoryCommentRevision :: HistoryCommentRevision -> ByteString
authorSignature :: ByteString
authorSignature,
Hash32
$sel:revisionHash:HistoryCommentRevision :: HistoryCommentRevision -> Hash32
revisionHash :: Hash32
revisionHash,
Hash32
$sel:commentHash:HistoryCommentRevision :: HistoryCommentRevision -> Hash32
commentHash :: Hash32
commentHash
} -> do
HistoryCommentRevision
HistoryCommentRevisionHash UTCTime HistoryCommentHash
-> Transaction ()
Q.insertHistoryCommentRevision
HC.HistoryCommentRevision
{ Text
$sel:subject:HistoryCommentRevision :: Text
subject :: Text
subject,
Text
$sel:content:HistoryCommentRevision :: Text
content :: Text
content,
UTCTime
$sel:createdAt:HistoryCommentRevision :: UTCTime
createdAt :: UTCTime
createdAt,
$sel:comment:HistoryCommentRevision :: HistoryCommentHash
comment = Hash -> HistoryCommentHash
HistoryCommentHash (Hash -> HistoryCommentHash) -> Hash -> HistoryCommentHash
forall a b. (a -> b) -> a -> b
$ forall target source. From source target => source -> target
into @Hash Hash32
commentHash,
$sel:isHidden:HistoryCommentRevision :: Bool
isHidden = Bool
isHidden,
$sel:authorSignature:HistoryCommentRevision :: ByteString
authorSignature = ByteString
authorSignature,
$sel:revisionId:HistoryCommentRevision :: HistoryCommentRevisionHash
revisionId = Hash -> HistoryCommentRevisionHash
HistoryCommentRevisionHash (Hash -> HistoryCommentRevisionHash)
-> Hash -> HistoryCommentRevisionHash
forall a b. (a -> b) -> a -> b
$ forall target source. From source target => source -> target
into @Hash Hash32
revisionHash
}
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
closed) IO ()
loop
IO ()
loop
DebugFlag -> String -> IO ()
forall (m :: * -> *). Monad m => DebugFlag -> String -> m ()
Debug.debugLogM DebugFlag
Debug.HistoryComments String
"Inserter worker finished"
hashCheckingWorker ::
Codebase.Codebase IO v a ->
(MsgOrError err HistoryCommentDownloaderChunk -> STM Bool) ->
TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32]) ->
IO ()
hashCheckingWorker :: forall v a err.
Codebase IO v a
-> (MsgOrError err HistoryCommentDownloaderChunk -> STM Bool)
-> TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> IO ()
hashCheckingWorker Codebase IO v a
codebase MsgOrError err HistoryCommentDownloaderChunk -> STM Bool
send TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
hashesToCheckQ = do
let loop :: IO ()
loop = do
([(HistoryCommentHash32, [HistoryCommentRevisionHash32])]
hashes, Bool
closed) <- STM
([(HistoryCommentHash32, [HistoryCommentRevisionHash32])], Bool)
-> IO
([(HistoryCommentHash32, [HistoryCommentRevisionHash32])], Bool)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (Int
-> STM
(Maybe (HistoryCommentHash32, [HistoryCommentRevisionHash32]))
-> STM
([(HistoryCommentHash32, [HistoryCommentRevisionHash32])], Bool)
forall a. Show a => Int -> STM (Maybe a) -> STM ([a], Bool)
fetchChunk Int
insertCommentBatchSize (TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> STM
(Maybe (HistoryCommentHash32, [HistoryCommentRevisionHash32]))
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
hashesToCheckQ))
DebugFlag -> String -> Int -> IO ()
forall a (m :: * -> *).
(Show a, Monad m) =>
DebugFlag -> String -> a -> m ()
Debug.debugM DebugFlag
Debug.HistoryComments String
"Checking hashes chunk of size" ([(HistoryCommentHash32, [HistoryCommentRevisionHash32])] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(HistoryCommentHash32, [HistoryCommentRevisionHash32])]
hashes)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not ([(HistoryCommentHash32, [HistoryCommentRevisionHash32])] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(HistoryCommentHash32, [HistoryCommentRevisionHash32])]
hashes)) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
[Either HistoryCommentHash32 HistoryCommentRevisionHash32]
unknownHashes <- do
Codebase IO v a
-> Transaction
[Either HistoryCommentHash32 HistoryCommentRevisionHash32]
-> IO [Either HistoryCommentHash32 HistoryCommentRevisionHash32]
forall (m :: * -> *) v a b.
MonadIO m =>
Codebase m v a -> Transaction b -> m b
Codebase.runTransaction Codebase IO v a
codebase (Transaction
[Either HistoryCommentHash32 HistoryCommentRevisionHash32]
-> IO [Either HistoryCommentHash32 HistoryCommentRevisionHash32])
-> Transaction
[Either HistoryCommentHash32 HistoryCommentRevisionHash32]
-> IO [Either HistoryCommentHash32 HistoryCommentRevisionHash32]
forall a b. (a -> b) -> a -> b
$ do
[(HistoryCommentHash32, [HistoryCommentRevisionHash32])]
hashes [(HistoryCommentHash32, [HistoryCommentRevisionHash32])]
-> ([(HistoryCommentHash32, [HistoryCommentRevisionHash32])]
-> Transaction
[Either HistoryCommentHash32 HistoryCommentRevisionHash32])
-> Transaction
[Either HistoryCommentHash32 HistoryCommentRevisionHash32]
forall a b. a -> (a -> b) -> b
& ((HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> Transaction
[Either HistoryCommentHash32 HistoryCommentRevisionHash32])
-> [(HistoryCommentHash32, [HistoryCommentRevisionHash32])]
-> Transaction
[Either HistoryCommentHash32 HistoryCommentRevisionHash32]
forall (m :: * -> *) (f :: * -> *) b a.
(Monad m, Foldable f, Monoid b) =>
(a -> m b) -> f a -> m b
foldMapM \(HistoryCommentHash32 Hash32
commentHash, [HistoryCommentRevisionHash32]
revisionHashes) -> do
Bool
haveComment <- Hash32 -> Transaction Bool
Q.haveHistoryComment Hash32
commentHash
if Bool
haveComment
then do
[HistoryCommentRevisionHash32]
revisionHashes [HistoryCommentRevisionHash32]
-> ([HistoryCommentRevisionHash32]
-> Transaction
[Either HistoryCommentHash32 HistoryCommentRevisionHash32])
-> Transaction
[Either HistoryCommentHash32 HistoryCommentRevisionHash32]
forall a b. a -> (a -> b) -> b
& (HistoryCommentRevisionHash32
-> Transaction
(Maybe (Either HistoryCommentHash32 HistoryCommentRevisionHash32)))
-> [HistoryCommentRevisionHash32]
-> Transaction
[Either HistoryCommentHash32 HistoryCommentRevisionHash32]
forall (t :: * -> *) (f :: * -> *) a b.
(Witherable t, Applicative f) =>
(a -> f (Maybe b)) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f (Maybe b)) -> [a] -> f [b]
wither \(HistoryCommentRevisionHash32 Hash32
revisionHash) -> do
Hash32 -> Transaction Bool
Q.haveHistoryCommentRevision Hash32
revisionHash
Transaction Bool
-> (Bool
-> Maybe
(Either HistoryCommentHash32 HistoryCommentRevisionHash32))
-> Transaction
(Maybe (Either HistoryCommentHash32 HistoryCommentRevisionHash32))
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
Bool
True -> Maybe (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
forall a. Maybe a
Nothing
Bool
False -> Either HistoryCommentHash32 HistoryCommentRevisionHash32
-> Maybe (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
forall a. a -> Maybe a
Just (Either HistoryCommentHash32 HistoryCommentRevisionHash32
-> Maybe
(Either HistoryCommentHash32 HistoryCommentRevisionHash32))
-> Either HistoryCommentHash32 HistoryCommentRevisionHash32
-> Maybe (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
forall a b. (a -> b) -> a -> b
$ HistoryCommentRevisionHash32
-> Either HistoryCommentHash32 HistoryCommentRevisionHash32
forall a b. b -> Either a b
Right (HistoryCommentRevisionHash32
-> Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> HistoryCommentRevisionHash32
-> Either HistoryCommentHash32 HistoryCommentRevisionHash32
forall a b. (a -> b) -> a -> b
$ Hash32 -> HistoryCommentRevisionHash32
HistoryCommentRevisionHash32 (Hash32 -> HistoryCommentRevisionHash32)
-> Hash32 -> HistoryCommentRevisionHash32
forall a b. (a -> b) -> a -> b
$ Hash32
revisionHash
else do
[Either HistoryCommentHash32 HistoryCommentRevisionHash32]
-> Transaction
[Either HistoryCommentHash32 HistoryCommentRevisionHash32]
forall a. a -> Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either HistoryCommentHash32 HistoryCommentRevisionHash32
-> [Either HistoryCommentHash32 HistoryCommentRevisionHash32]
forall a. a -> [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure (HistoryCommentHash32
-> Either HistoryCommentHash32 HistoryCommentRevisionHash32
forall a b. a -> Either a b
Left (HistoryCommentHash32
-> Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> HistoryCommentHash32
-> Either HistoryCommentHash32 HistoryCommentRevisionHash32
forall a b. (a -> b) -> a -> b
$ Hash32 -> HistoryCommentHash32
HistoryCommentHash32 Hash32
commentHash) [Either HistoryCommentHash32 HistoryCommentRevisionHash32]
-> [Either HistoryCommentHash32 HistoryCommentRevisionHash32]
-> [Either HistoryCommentHash32 HistoryCommentRevisionHash32]
forall a. Semigroup a => a -> a -> a
<> (HistoryCommentRevisionHash32
-> Either HistoryCommentHash32 HistoryCommentRevisionHash32
forall a b. b -> Either a b
Right (HistoryCommentRevisionHash32
-> Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> [HistoryCommentRevisionHash32]
-> [Either HistoryCommentHash32 HistoryCommentRevisionHash32]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [HistoryCommentRevisionHash32]
revisionHashes))
case Set (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> Maybe
(NESet (Either HistoryCommentHash32 HistoryCommentRevisionHash32))
forall a. Set a -> Maybe (NESet a)
NESet.nonEmptySet ([Either HistoryCommentHash32 HistoryCommentRevisionHash32]
-> Set (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
forall a. Ord a => [a] -> Set a
Set.fromList [Either HistoryCommentHash32 HistoryCommentRevisionHash32]
unknownHashes) of
Maybe
(NESet (Either HistoryCommentHash32 HistoryCommentRevisionHash32))
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just NESet (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
unknownHashesSet -> do
IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> (STM Bool -> IO Bool) -> STM Bool -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM Bool -> IO Bool
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Bool -> IO ()) -> STM Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MsgOrError err HistoryCommentDownloaderChunk -> STM Bool
send (MsgOrError err HistoryCommentDownloaderChunk -> STM Bool)
-> MsgOrError err HistoryCommentDownloaderChunk -> STM Bool
forall a b. (a -> b) -> a -> b
$ HistoryCommentDownloaderChunk
-> MsgOrError err HistoryCommentDownloaderChunk
forall err a. a -> MsgOrError err a
Msg (HistoryCommentDownloaderChunk
-> MsgOrError err HistoryCommentDownloaderChunk)
-> HistoryCommentDownloaderChunk
-> MsgOrError err HistoryCommentDownloaderChunk
forall a b. (a -> b) -> a -> b
$ NESet (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> HistoryCommentDownloaderChunk
RequestCommentsChunk NESet (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
unknownHashesSet
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
closed) IO ()
loop
IO ()
loop
IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> (STM Bool -> IO Bool) -> STM Bool -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM Bool -> IO Bool
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Bool -> IO ()) -> STM Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MsgOrError err HistoryCommentDownloaderChunk -> STM Bool
send (MsgOrError err HistoryCommentDownloaderChunk -> STM Bool)
-> MsgOrError err HistoryCommentDownloaderChunk -> STM Bool
forall a b. (a -> b) -> a -> b
$ HistoryCommentDownloaderChunk
-> MsgOrError err HistoryCommentDownloaderChunk
forall err a. a -> MsgOrError err a
Msg (HistoryCommentDownloaderChunk
-> MsgOrError err HistoryCommentDownloaderChunk)
-> HistoryCommentDownloaderChunk
-> MsgOrError err HistoryCommentDownloaderChunk
forall a b. (a -> b) -> a -> b
$ HistoryCommentDownloaderChunk
DoneCheckingHashesChunk
DebugFlag -> String -> IO ()
forall (m :: * -> *). Monad m => DebugFlag -> String -> m ()
Debug.debugLogM DebugFlag
Debug.HistoryComments String
"Hash checking worker finished"
receiverWorker ::
STM (Maybe (MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk)) ->
TMVar Text ->
TBMQueue
( HistoryCommentHash32,
[HistoryCommentRevisionHash32]
) ->
TBMQueue (Either HistoryComment HistoryCommentRevision) ->
IO ()
receiverWorker :: STM
(Maybe
(MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk))
-> TMVar Text
-> TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> TBMQueue (Either HistoryComment HistoryCommentRevision)
-> IO ()
receiverWorker STM
(Maybe
(MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk))
recv TMVar Text
errMVar TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
hashesToCheckQ TBMQueue (Either HistoryComment HistoryCommentRevision)
commentsQ = do
let loop :: IO ()
loop = do
IO ()
next <- STM (IO ()) -> IO (IO ())
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically do
STM
(Maybe
(MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk))
recv STM
(Maybe
(MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk))
-> (Maybe
(MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk)
-> STM (IO ()))
-> STM (IO ())
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe
(MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk)
Nothing -> do
TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
hashesToCheckQ
TBMQueue (Either HistoryComment HistoryCommentRevision) -> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue (Either HistoryComment HistoryCommentRevision)
commentsQ
IO () -> STM (IO ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
Just (DeserialiseFailure Text
err) -> do
TMVar Text -> Text -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar Text
errMVar (Text -> STM ()) -> Text -> STM ()
forall a b. (a -> b) -> a -> b
$ Text
"downloadHistoryComments: deserialisation failure: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
err
IO () -> STM (IO ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
Just (UserErr DownloadCommentsResponse
err) -> do
TMVar Text -> Text -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar Text
errMVar (Text -> STM ()) -> Text -> STM ()
forall a b. (a -> b) -> a -> b
$ Text
"downloadHistoryComments: server error: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> DownloadCommentsResponse -> Text
forall a. Show a => a -> Text
tShow DownloadCommentsResponse
err
IO () -> STM (IO ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
Just (Msg HistoryCommentUploaderChunk
msg) -> do
case HistoryCommentUploaderChunk
msg of
PossiblyNewHashesChunk NonEmpty (HistoryCommentHash32, [HistoryCommentRevisionHash32])
hashesToCheck -> do
NonEmpty (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> ((HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> STM ())
-> STM ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ NonEmpty (HistoryCommentHash32, [HistoryCommentRevisionHash32])
hashesToCheck (((HistoryCommentHash32, [HistoryCommentRevisionHash32]) -> STM ())
-> STM ())
-> ((HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> STM ())
-> STM ()
forall a b. (a -> b) -> a -> b
$ \(HistoryCommentHash32, [HistoryCommentRevisionHash32])
h -> TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> (HistoryCommentHash32, [HistoryCommentRevisionHash32]) -> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
hashesToCheckQ (HistoryCommentHash32, [HistoryCommentRevisionHash32])
h
HistoryCommentUploaderChunk
DoneSendingHashesChunk -> do
TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
hashesToCheckQ
HistoryCommentChunk HistoryComment
comment -> do
TBMQueue (Either HistoryComment HistoryCommentRevision)
-> Either HistoryComment HistoryCommentRevision -> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue (Either HistoryComment HistoryCommentRevision)
commentsQ (HistoryComment -> Either HistoryComment HistoryCommentRevision
forall a b. a -> Either a b
Left HistoryComment
comment)
HistoryCommentRevisionChunk HistoryCommentRevision
revision -> do
TBMQueue (Either HistoryComment HistoryCommentRevision)
-> Either HistoryComment HistoryCommentRevision -> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue (Either HistoryComment HistoryCommentRevision)
commentsQ (HistoryCommentRevision
-> Either HistoryComment HistoryCommentRevision
forall a b. b -> Either a b
Right HistoryCommentRevision
revision)
IO () -> STM (IO ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure IO ()
loop
IO ()
next
IO ()
loop
DebugFlag -> String -> IO ()
forall (m :: * -> *). Monad m => DebugFlag -> String -> m ()
Debug.debugLogM DebugFlag
Debug.HistoryComments String
"Receiver worker finished"
insertCommentBatchSize :: Int
insertCommentBatchSize = Int
100