module Unison.Share.HistoryComments
( uploadHistoryComments,
downloadHistoryComments,
)
where
import Control.Concurrent.STM.TBMQueue (TBMQueue, closeTBMQueue, newTBMQueueIO, readTBMQueue, writeTBMQueue)
import Control.Monad.Reader
import Control.Monad.Trans.Maybe (mapMaybeT)
import Data.List.NonEmpty qualified as NEL
import Data.Monoid (All (..))
import Data.Set qualified as Set
import Data.Set.NonEmpty qualified as NESet
import Data.Text qualified as Text
import Data.Void
import Ki qualified
import Servant.API
import System.IO.Unsafe (unsafePerformIO)
import U.Codebase.Sqlite.Queries qualified as Q
import Unison.Auth.Tokens (newTokenProvider)
import Unison.Cli.Monad
import Unison.Cli.Monad qualified as Cli
import Unison.Codebase qualified as Codebase
import Unison.Debug qualified as Debug
import Unison.Hash (Hash)
import Unison.Hash32 (Hash32)
import Unison.Hash32 qualified as Hash32
import Unison.HashTags
import Unison.HistoryComment qualified as HC
import Unison.KeyThumbprint (KeyThumbprint (KeyThumbprint))
import Unison.Prelude
import Unison.Server.HistoryComments.Types
import Unison.Server.HistoryComments.Types qualified as Share
import Unison.Share.Codeserver qualified as Codeserver
import Unison.Sqlite qualified as Sqlite
import Unison.Sync.Types (RepoInfo)
import Unison.Util.Monoid (foldMapM)
import Unison.Util.Websockets
import UnliftIO.Environment (lookupEnv)
import UnliftIO.STM
msgBufferSize :: Int
msgBufferSize :: Int
msgBufferSize = Int
20
syncHistoryCommentsEnvKey :: String
= String
"UNISON_SYNC_HISTORY_COMMENTS"
shouldSyncHistoryComments :: Bool
= IO Bool -> Bool
forall a. IO a -> a
unsafePerformIO (IO Bool -> Bool) -> IO Bool -> Bool
forall a b. (a -> b) -> a -> b
$ do
String -> IO (Maybe String)
forall (m :: * -> *). MonadIO m => String -> m (Maybe String)
lookupEnv String
syncHistoryCommentsEnvKey IO (Maybe String) -> (Maybe String -> Bool) -> IO Bool
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
Just String
"true" -> Bool
True
Maybe String
_ -> Bool
False
{-# NOINLINE shouldSyncHistoryComments #-}
uploadHistoryComments ::
Hash32 ->
Codeserver.CodeserverURI ->
RepoInfo ->
Cli ()
Hash32
rootCausalHash32 CodeserverURI
codeserver RepoInfo
repoInfo
| Bool -> Bool
not Bool
shouldSyncHistoryComments = () -> Cli ()
forall a. a -> Cli a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
| Bool
otherwise = do
Cli.Env {codebase, credentialManager} <- Cli Env
forall r (m :: * -> *). MonadReader r m => m r
ask
let path = String
"/ucm/v1/history-comments/upload?branchRef=" String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Text -> String
Text.unpack (RepoInfo -> Text
forall a. ToHttpApiData a => a -> Text
toQueryParam RepoInfo
repoInfo)
let tokenProvider = CredentialManager -> TokenProvider
newTokenProvider CredentialManager
credentialManager
result <- liftIO $ withCodeserverWebsocket @IO @(MsgOrError Void HistoryCommentUploaderChunk) @(MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk) msgBufferSize codeserver tokenProvider path \Queues {MsgOrError Void HistoryCommentUploaderChunk -> STM Bool
send :: MsgOrError Void HistoryCommentUploaderChunk -> STM Bool
send :: forall i o. Queues i o -> i -> STM Bool
send, STM
(Maybe
(MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk))
receive :: STM
(Maybe
(MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk))
receive :: forall i o. Queues i o -> STM (Maybe o)
receive} -> (Scope -> IO ()) -> IO ()
forall a. (Scope -> IO a) -> IO a
Ki.scoped \Scope
scope -> do
commentHashesToSendQ <- forall a. Int -> IO (TBMQueue a)
newTBMQueueIO @(HistoryCommentHash32, [HistoryCommentRevisionHash32]) Int
100
commentHashesToUploadQ <- newTBMQueueIO @(Either HistoryCommentHash32 HistoryCommentRevisionHash32) 100
errMVar <- newEmptyTMVarIO
_ <- Ki.fork scope (hashNotifyWorker send commentHashesToSendQ)
uploaderThread <- Ki.fork scope (uploaderWorker codebase send commentHashesToUploadQ)
_ <- Ki.fork scope (receiverWorker receive commentHashesToUploadQ errMVar)
Codebase.runTransaction codebase $ do
rootCausalHashId <- Q.expectCausalHashIdByCausalHash $ CausalHash $ Hash32.toHash rootCausalHash32
Q.streamHistoryCommentsForCausal rootCausalHashId \Transaction (Maybe (HistoryCommentId, Hash32))
getCommentIds -> do
let loop :: Transaction ()
loop = do
result <- MaybeT Transaction () -> Transaction (Maybe ())
forall (m :: * -> *) a. MaybeT m a -> m (Maybe a)
runMaybeT (MaybeT Transaction () -> Transaction (Maybe ()))
-> MaybeT Transaction () -> Transaction (Maybe ())
forall a b. (a -> b) -> a -> b
$ do
(commentId, commentHash32) <- Transaction (Maybe (HistoryCommentId, Hash32))
-> MaybeT Transaction (HistoryCommentId, Hash32)
forall (m :: * -> *) a. m (Maybe a) -> MaybeT m a
MaybeT (Transaction (Maybe (HistoryCommentId, Hash32))
-> MaybeT Transaction (HistoryCommentId, Hash32))
-> Transaction (Maybe (HistoryCommentId, Hash32))
-> MaybeT Transaction (HistoryCommentId, Hash32)
forall a b. (a -> b) -> a -> b
$ Transaction (Maybe (HistoryCommentId, Hash32))
getCommentIds
revisionHashes <- lift $ Q.commentRevisionHashes commentId
Debug.debugM Debug.HistoryComments "Queueing comment for checking" commentHash32
lift . Sqlite.unsafeIO $ atomically $ writeTBMQueue commentHashesToSendQ (HistoryCommentHash32 commentHash32, HistoryCommentRevisionHash32 <$> revisionHashes)
case result of
Just () -> Transaction ()
loop
Maybe ()
Nothing -> () -> Transaction ()
forall a. a -> Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Transaction ()
loop
atomically $ closeTBMQueue commentHashesToSendQ
Debug.debugLogM Debug.HistoryComments "Uploading history comments: waiting for uploader thread to finish"
atomically $ Ki.await uploaderThread
Debug.debugLogM Debug.HistoryComments "Done; closing connection"
case result of
Left ConnectionException
err -> String -> Cli ()
forall a. HasCallStack => String -> a
error (String -> Cli ()) -> String -> Cli ()
forall a b. (a -> b) -> a -> b
$ String
"uploadCommentsClient: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> ConnectionException -> String
forall a. Show a => a -> String
show ConnectionException
err
Right ((), [MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk]
_leftovers ) -> () -> Cli ()
forall a. a -> Cli a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
where
flushTBMQueue :: TBMQueue a -> STM ([a], Bool)
flushTBMQueue :: forall a. TBMQueue a -> STM ([a], Bool)
flushTBMQueue TBMQueue a
q = do
STM (Maybe a) -> STM (Maybe (Maybe a))
forall (f :: * -> *) a. Alternative f => f a -> f (Maybe a)
optional (TBMQueue a -> STM (Maybe a)
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue a
q) STM (Maybe (Maybe a))
-> (Maybe (Maybe a) -> STM ([a], Bool)) -> STM ([a], Bool)
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe (Maybe a)
Nothing -> STM ([a], Bool)
forall a. STM a
forall (f :: * -> *) a. Alternative f => f a
empty
Just Maybe a
Nothing -> do
([a], Bool) -> STM ([a], Bool)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([], Bool
True)
Just (Just a
v) -> do
(vs, closed) <- TBMQueue a -> STM ([a], Bool)
forall a. TBMQueue a -> STM ([a], Bool)
flushTBMQueue TBMQueue a
q STM ([a], Bool) -> STM ([a], Bool) -> STM ([a], Bool)
forall a. STM a -> STM a -> STM a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ([a], Bool) -> STM ([a], Bool)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([], Bool
False)
pure (v : vs, closed)
uploaderWorker ::
Codebase.Codebase IO v a ->
( MsgOrError err HistoryCommentUploaderChunk ->
STM Bool
) ->
TBMQueue (Either HistoryCommentHash32 HistoryCommentRevisionHash32) ->
IO ()
uploaderWorker :: forall v a err.
Codebase IO v a
-> (MsgOrError err HistoryCommentUploaderChunk -> STM Bool)
-> TBMQueue
(Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> IO ()
uploaderWorker Codebase IO v a
codebase MsgOrError err HistoryCommentUploaderChunk -> STM Bool
send TBMQueue (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
commentHashesToUploadQ = do
let loop :: MaybeT IO (ZonkAny 0)
loop = do
hash <- IO
(Maybe (Either HistoryCommentHash32 HistoryCommentRevisionHash32))
-> MaybeT
IO (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
forall (m :: * -> *) a. m (Maybe a) -> MaybeT m a
MaybeT (IO
(Maybe (Either HistoryCommentHash32 HistoryCommentRevisionHash32))
-> MaybeT
IO (Either HistoryCommentHash32 HistoryCommentRevisionHash32))
-> IO
(Maybe (Either HistoryCommentHash32 HistoryCommentRevisionHash32))
-> MaybeT
IO (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
forall a b. (a -> b) -> a -> b
$ STM
(Maybe (Either HistoryCommentHash32 HistoryCommentRevisionHash32))
-> IO
(Maybe (Either HistoryCommentHash32 HistoryCommentRevisionHash32))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TBMQueue (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> STM
(Maybe (Either HistoryCommentHash32 HistoryCommentRevisionHash32))
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
commentHashesToUploadQ)
mapMaybeT (Codebase.runTransaction codebase) $ do
case hash of
Left (HistoryCommentHash32 Hash32
commentHash) -> do
DebugFlag -> String -> Hash32 -> MaybeT Transaction ()
forall a (m :: * -> *).
(Show a, Monad m) =>
DebugFlag -> String -> a -> m ()
Debug.debugM DebugFlag
Debug.HistoryComments String
"Uploading comment for hash" Hash32
commentHash
commentId <- Transaction HistoryCommentId -> MaybeT Transaction HistoryCommentId
forall (m :: * -> *) a. Monad m => m a -> MaybeT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Transaction HistoryCommentId
-> MaybeT Transaction HistoryCommentId)
-> Transaction HistoryCommentId
-> MaybeT Transaction HistoryCommentId
forall a b. (a -> b) -> a -> b
$ Hash32 -> Transaction HistoryCommentId
Q.expectHistoryCommentIdByHash32 Hash32
commentHash
comment <- lift $ Q.expectHistoryCommentById commentId
success <- lift $ Sqlite.unsafeIO $ atomically $ send (Msg $ intoChunk (Left comment))
when (not success) $ Debug.debugLogM Debug.HistoryComments "Failed to send the history comment, shutting down"
guard success
Right (HistoryCommentRevisionHash32 Hash32
revisionHash) -> do
revisionId <- Transaction HistoryCommentRevisionId
-> MaybeT Transaction HistoryCommentRevisionId
forall (m :: * -> *) a. Monad m => m a -> MaybeT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Transaction HistoryCommentRevisionId
-> MaybeT Transaction HistoryCommentRevisionId)
-> Transaction HistoryCommentRevisionId
-> MaybeT Transaction HistoryCommentRevisionId
forall a b. (a -> b) -> a -> b
$ Hash32 -> Transaction HistoryCommentRevisionId
Q.expectHistoryCommentRevisionIdByHash32 Hash32
revisionHash
revision <- lift $ Q.expectHistoryCommentRevisionById revisionId
success <- lift $ Sqlite.unsafeIO $ atomically $ send (Msg $ intoChunk (Right revision))
when (not success) $ Debug.debugLogM Debug.HistoryComments "Failed to send history comment revision, shutting down"
guard success
loop
IO (Maybe (ZonkAny 0)) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe (ZonkAny 0)) -> IO ())
-> (MaybeT IO (ZonkAny 0) -> IO (Maybe (ZonkAny 0)))
-> MaybeT IO (ZonkAny 0)
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MaybeT IO (ZonkAny 0) -> IO (Maybe (ZonkAny 0))
forall (m :: * -> *) a. MaybeT m a -> m (Maybe a)
runMaybeT (MaybeT IO (ZonkAny 0) -> IO ()) -> MaybeT IO (ZonkAny 0) -> IO ()
forall a b. (a -> b) -> a -> b
$ MaybeT IO (ZonkAny 0)
loop
receiverWorker ::
STM (Maybe (MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk)) ->
TBMQueue
( Either
HistoryCommentHash32
HistoryCommentRevisionHash32
) ->
TMVar Text ->
IO ()
receiverWorker :: STM
(Maybe
(MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk))
-> TBMQueue
(Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> TMVar Text
-> IO ()
receiverWorker STM
(Maybe
(MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk))
receive TBMQueue (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
commentHashesToUploadQ TMVar Text
errMVar = do
let loop :: IO ()
loop = do
msgOrError <- STM
(Maybe
(MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk))
-> IO
(Maybe
(MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically STM
(Maybe
(MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk))
receive
case msgOrError of
Maybe
(MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk)
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just (Msg HistoryCommentDownloaderChunk
msg) -> case HistoryCommentDownloaderChunk
msg of
HistoryCommentDownloaderChunk
DoneCheckingHashesChunk -> do
STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBMQueue (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
commentHashesToUploadQ
IO ()
loop
RequestCommentsChunk NESet (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
comments -> do
STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ NESet (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> (Either HistoryCommentHash32 HistoryCommentRevisionHash32
-> STM ())
-> STM ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ NESet (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
comments ((Either HistoryCommentHash32 HistoryCommentRevisionHash32
-> STM ())
-> STM ())
-> (Either HistoryCommentHash32 HistoryCommentRevisionHash32
-> STM ())
-> STM ()
forall a b. (a -> b) -> a -> b
$ TBMQueue (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> Either HistoryCommentHash32 HistoryCommentRevisionHash32
-> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
commentHashesToUploadQ
IO ()
loop
Just (DeserialiseFailure Text
msg) -> do
STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar Text -> Text -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar Text
errMVar (Text -> STM ()) -> Text -> STM ()
forall a b. (a -> b) -> a -> b
$ Text
"uploadHistoryComments: deserialisation failure: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
msg
Just (UserErr UploadCommentsResponse
err) -> do
STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar Text -> Text -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar Text
errMVar (Text -> STM ()) -> Text -> STM ()
forall a b. (a -> b) -> a -> b
$ Text
"uploadHistoryComments: server error: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> UploadCommentsResponse -> Text
forall a. Show a => a -> Text
tShow UploadCommentsResponse
err
IO ()
loop
hashNotifyWorker :: (MsgOrError Void HistoryCommentUploaderChunk -> STM Bool) -> TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32]) -> IO ()
hashNotifyWorker :: (MsgOrError Void HistoryCommentUploaderChunk -> STM Bool)
-> TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> IO ()
hashNotifyWorker MsgOrError Void HistoryCommentUploaderChunk -> STM Bool
send TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
q = do
let loop :: IO ()
loop = do
isClosed <- STM Bool -> IO Bool
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ do
(hashesToCheck, isClosed) <- TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> STM
([(HistoryCommentHash32, [HistoryCommentRevisionHash32])], Bool)
forall a. TBMQueue a -> STM ([a], Bool)
flushTBMQueue TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
q
All sentSuccessfully <-
NEL.nonEmpty hashesToCheck & foldMapM \NonEmpty (HistoryCommentHash32, [HistoryCommentRevisionHash32])
possiblyNewHashes -> do
DebugFlag
-> String
-> NonEmpty (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> STM ()
forall a (m :: * -> *).
(Show a, Monad m) =>
DebugFlag -> String -> a -> m ()
Debug.debugM DebugFlag
Debug.HistoryComments String
"Sending possibly new hashes:" NonEmpty (HistoryCommentHash32, [HistoryCommentRevisionHash32])
possiblyNewHashes
Bool -> All
All (Bool -> All) -> STM Bool -> STM All
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (MsgOrError Void HistoryCommentUploaderChunk -> STM Bool
send (MsgOrError Void HistoryCommentUploaderChunk -> STM Bool)
-> MsgOrError Void HistoryCommentUploaderChunk -> STM Bool
forall a b. (a -> b) -> a -> b
$ HistoryCommentUploaderChunk
-> MsgOrError Void HistoryCommentUploaderChunk
forall err a. a -> MsgOrError err a
Msg (HistoryCommentUploaderChunk
-> MsgOrError Void HistoryCommentUploaderChunk)
-> HistoryCommentUploaderChunk
-> MsgOrError Void HistoryCommentUploaderChunk
forall a b. (a -> b) -> a -> b
$ NonEmpty (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> HistoryCommentUploaderChunk
PossiblyNewHashesChunk NonEmpty (HistoryCommentHash32, [HistoryCommentRevisionHash32])
possiblyNewHashes)
when (isClosed || not sentSuccessfully) $
Debug.debugLogM Debug.HistoryComments "Hash notify worker: queue closed or server closed connection, no longer sending hashes"
pure (isClosed || not sentSuccessfully)
if isClosed
then do
void . atomically $ send (Msg DoneSendingHashesChunk)
else loop
IO ()
loop
intoChunk :: Either
(HistoryComment UTCTime KeyThumbprint Hash32 Hash32)
(HistoryCommentRevision Hash32 UTCTime Hash32)
-> HistoryCommentUploaderChunk
intoChunk = \case
Left
( HC.HistoryComment
{ Text
author :: Text
author :: forall createdAt thumbprint causal commentId.
HistoryComment createdAt thumbprint causal commentId -> Text
author,
UTCTime
createdAt :: UTCTime
createdAt :: forall createdAt thumbprint causal commentId.
HistoryComment createdAt thumbprint causal commentId -> createdAt
createdAt,
authorThumbprint :: forall createdAt thumbprint causal commentId.
HistoryComment createdAt thumbprint causal commentId -> thumbprint
authorThumbprint = KeyThumbprint Text
authorThumbprint,
Hash32
causal :: Hash32
causal :: forall createdAt thumbprint causal commentId.
HistoryComment createdAt thumbprint causal commentId -> causal
causal,
commentId :: forall createdAt thumbprint causal commentId.
HistoryComment createdAt thumbprint causal commentId -> commentId
commentId = Hash32
commentHash
}
) ->
HistoryComment -> HistoryCommentUploaderChunk
HistoryCommentChunk
Share.HistoryComment
{ Text
author :: Text
author :: Text
author,
UTCTime
createdAt :: UTCTime
createdAt :: UTCTime
createdAt,
Text
authorThumbprint :: Text
authorThumbprint :: Text
authorThumbprint,
causalHash :: Hash32
Share.causalHash = Hash32
causal,
Hash32
commentHash :: Hash32
commentHash :: Hash32
commentHash
}
Right
( HC.HistoryCommentRevision
{ Text
subject :: Text
subject :: forall revisionId createdAt comment.
HistoryCommentRevision revisionId createdAt comment -> Text
subject,
Text
content :: Text
content :: forall revisionId createdAt comment.
HistoryCommentRevision revisionId createdAt comment -> Text
content,
UTCTime
createdAt :: UTCTime
createdAt :: forall revisionId createdAt comment.
HistoryCommentRevision revisionId createdAt comment -> createdAt
createdAt,
comment :: forall revisionId createdAt comment.
HistoryCommentRevision revisionId createdAt comment -> comment
comment = Hash32
commentHash,
Bool
isHidden :: Bool
isHidden :: forall revisionId createdAt comment.
HistoryCommentRevision revisionId createdAt comment -> Bool
isHidden,
ByteString
authorSignature :: ByteString
authorSignature :: forall revisionId createdAt comment.
HistoryCommentRevision revisionId createdAt comment -> ByteString
authorSignature,
Hash32
revisionId :: Hash32
revisionId :: forall revisionId createdAt comment.
HistoryCommentRevision revisionId createdAt comment -> revisionId
revisionId
}
) ->
HistoryCommentRevision -> HistoryCommentUploaderChunk
HistoryCommentRevisionChunk
Share.HistoryCommentRevision
{ Text
subject :: Text
subject :: Text
subject,
Text
content :: Text
content :: Text
content,
UTCTime
createdAt :: UTCTime
createdAt :: UTCTime
createdAt,
Bool
isHidden :: Bool
isHidden :: Bool
isHidden,
ByteString
authorSignature :: ByteString
authorSignature :: ByteString
authorSignature,
revisionHash :: Hash32
revisionHash = Hash32
revisionId,
Hash32
commentHash :: Hash32
commentHash :: Hash32
commentHash
}
fetchChunk :: (Show a) => Int -> STM (Maybe a) -> STM ([a], Bool)
fetchChunk :: forall a. Show a => Int -> STM (Maybe a) -> STM ([a], Bool)
fetchChunk Int
size STM (Maybe a)
action = do
let go :: Int -> STM ([a], Bool)
go Int
0 = ([a], Bool) -> STM ([a], Bool)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([], Bool
False)
go Int
n = do
STM (Maybe a) -> STM (Maybe (Maybe a))
forall (f :: * -> *) a. Alternative f => f a -> f (Maybe a)
optional STM (Maybe a)
action STM (Maybe (Maybe a))
-> (Maybe (Maybe a) -> STM ([a], Bool)) -> STM ([a], Bool)
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe (Maybe a)
Nothing -> do
empty
Just Maybe a
Nothing -> do
([a], Bool) -> STM ([a], Bool)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([], Bool
True)
Just (Just a
val) -> do
DebugFlag -> String -> a -> STM ()
forall a (m :: * -> *).
(Show a, Monad m) =>
DebugFlag -> String -> a -> m ()
Debug.debugM DebugFlag
Debug.HistoryComments String
"Fetched value from queue" a
val
(rest, exhausted) <- Int -> STM ([a], Bool)
go (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) STM ([a], Bool) -> STM ([a], Bool) -> STM ([a], Bool)
forall a. STM a -> STM a -> STM a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ([a], Bool) -> STM ([a], Bool)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([], Bool
False)
pure (val : rest, exhausted)
Int -> STM ([a], Bool)
go Int
size
downloadHistoryComments ::
Codeserver.CodeserverURI ->
RepoInfo ->
Cli ()
CodeserverURI
codeserver RepoInfo
repoInfo
| Bool -> Bool
not Bool
shouldSyncHistoryComments = () -> Cli ()
forall a. a -> Cli a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
| Bool
otherwise = do
Cli.Env {codebase, credentialManager} <- Cli Env
forall r (m :: * -> *). MonadReader r m => m r
ask
let path = String
"/ucm/v1/history-comments/download?branchRef=" String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Text -> String
Text.unpack (RepoInfo -> Text
forall a. ToHttpApiData a => a -> Text
toQueryParam RepoInfo
repoInfo)
let tokenProvider = CredentialManager -> TokenProvider
newTokenProvider CredentialManager
credentialManager
result <- liftIO $ withCodeserverWebsocket @IO @(MsgOrError Void HistoryCommentDownloaderChunk) @(MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk) msgBufferSize codeserver tokenProvider path \Queues {MsgOrError Void HistoryCommentDownloaderChunk -> STM Bool
send :: forall i o. Queues i o -> i -> STM Bool
send :: MsgOrError Void HistoryCommentDownloaderChunk -> STM Bool
send, STM
(Maybe
(MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk))
receive :: forall i o. Queues i o -> STM (Maybe o)
receive :: STM
(Maybe
(MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk))
receive} -> (Scope -> IO ()) -> IO ()
forall a. (Scope -> IO a) -> IO a
Ki.scoped \Scope
scope -> do
hashesToCheckQ <- IO
(TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32]))
-> IO
(TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32]))
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO
(TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32]))
-> IO
(TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])))
-> IO
(TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32]))
-> IO
(TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32]))
forall a b. (a -> b) -> a -> b
$ forall a. Int -> IO (TBMQueue a)
newTBMQueueIO @(HistoryCommentHash32, [HistoryCommentRevisionHash32]) Int
100
commentsQ <- liftIO $ newTBMQueueIO 100
errMVar <- liftIO newEmptyTMVarIO
_receiverThread <- liftIO $ Ki.fork scope $ receiverWorker receive errMVar hashesToCheckQ commentsQ
inserterThread <- liftIO $ Ki.fork scope $ inserterWorker codebase commentsQ
_hashCheckingThread <- liftIO $ Ki.fork scope $ hashCheckingWorker codebase send hashesToCheckQ
Debug.debugLogM Debug.HistoryComments "Downloading history comments: waiting for inserter thread to finish"
atomically $ Ki.await inserterThread
case result of
Left ConnectionException
connException -> String -> Cli ()
forall a. HasCallStack => String -> a
error (String -> Cli ()) -> String -> Cli ()
forall a b. (a -> b) -> a -> b
$ String
"downloadHistoryComments: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> ConnectionException -> String
forall a. Show a => a -> String
show ConnectionException
connException
Right ((), [MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk]
_leftovers) -> () -> Cli ()
forall a. a -> Cli a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
where
inserterWorker ::
Codebase.Codebase IO v a ->
TBMQueue (Either HistoryComment HistoryCommentRevision) ->
IO ()
inserterWorker :: forall v a.
Codebase IO v a
-> TBMQueue (Either HistoryComment HistoryCommentRevision) -> IO ()
inserterWorker Codebase IO v a
codebase TBMQueue (Either HistoryComment HistoryCommentRevision)
commentsQ = do
let loop :: IO ()
loop = do
(chunk, closed) <- STM ([Either HistoryComment HistoryCommentRevision], Bool)
-> IO ([Either HistoryComment HistoryCommentRevision], Bool)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (Int
-> STM (Maybe (Either HistoryComment HistoryCommentRevision))
-> STM ([Either HistoryComment HistoryCommentRevision], Bool)
forall a. Show a => Int -> STM (Maybe a) -> STM ([a], Bool)
fetchChunk Int
insertCommentBatchSize (TBMQueue (Either HistoryComment HistoryCommentRevision)
-> STM (Maybe (Either HistoryComment HistoryCommentRevision))
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue (Either HistoryComment HistoryCommentRevision)
commentsQ))
when (not (null chunk)) $ do
Debug.debugM Debug.HistoryComments "Inserting comments chunk of size" (length chunk)
Codebase.runTransaction codebase $ do
for_ chunk \case
Left
HistoryComment
{ Text
author :: HistoryComment -> Text
author :: Text
author,
UTCTime
createdAt :: HistoryComment -> UTCTime
createdAt :: UTCTime
createdAt,
Text
authorThumbprint :: HistoryComment -> Text
authorThumbprint :: Text
authorThumbprint,
Hash32
causalHash :: HistoryComment -> Hash32
causalHash :: Hash32
causalHash,
Hash32
commentHash :: HistoryComment -> Hash32
commentHash :: Hash32
commentHash
} -> do
causalHashId <- Hash32 -> Transaction CausalHashId
Q.expectCausalHashIdForHash32 Hash32
causalHash
Q.insertHistoryComment
HC.HistoryComment
{ author,
createdAt,
authorThumbprint = KeyThumbprint authorThumbprint,
causal = causalHashId,
commentId = HistoryCommentHash $ into @Hash commentHash
}
Right
HistoryCommentRevision
{ Text
subject :: HistoryCommentRevision -> Text
subject :: Text
subject,
Text
content :: HistoryCommentRevision -> Text
content :: Text
content,
UTCTime
createdAt :: HistoryCommentRevision -> UTCTime
createdAt :: UTCTime
createdAt,
Bool
isHidden :: HistoryCommentRevision -> Bool
isHidden :: Bool
isHidden,
ByteString
authorSignature :: HistoryCommentRevision -> ByteString
authorSignature :: ByteString
authorSignature,
Hash32
revisionHash :: HistoryCommentRevision -> Hash32
revisionHash :: Hash32
revisionHash,
Hash32
commentHash :: HistoryCommentRevision -> Hash32
commentHash :: Hash32
commentHash
} -> do
HistoryCommentRevision
HistoryCommentRevisionHash UTCTime HistoryCommentHash
-> Transaction ()
Q.insertHistoryCommentRevision
HC.HistoryCommentRevision
{ Text
subject :: Text
subject :: Text
subject,
Text
content :: Text
content :: Text
content,
UTCTime
createdAt :: UTCTime
createdAt :: UTCTime
createdAt,
comment :: HistoryCommentHash
comment = Hash -> HistoryCommentHash
HistoryCommentHash (Hash -> HistoryCommentHash) -> Hash -> HistoryCommentHash
forall a b. (a -> b) -> a -> b
$ forall target source. From source target => source -> target
into @Hash Hash32
commentHash,
isHidden :: Bool
isHidden = Bool
isHidden,
authorSignature :: ByteString
authorSignature = ByteString
authorSignature,
revisionId :: HistoryCommentRevisionHash
revisionId = Hash -> HistoryCommentRevisionHash
HistoryCommentRevisionHash (Hash -> HistoryCommentRevisionHash)
-> Hash -> HistoryCommentRevisionHash
forall a b. (a -> b) -> a -> b
$ forall target source. From source target => source -> target
into @Hash Hash32
revisionHash
}
when (not closed) loop
IO ()
loop
DebugFlag -> String -> IO ()
forall (m :: * -> *). Monad m => DebugFlag -> String -> m ()
Debug.debugLogM DebugFlag
Debug.HistoryComments String
"Inserter worker finished"
hashCheckingWorker ::
Codebase.Codebase IO v a ->
(MsgOrError err HistoryCommentDownloaderChunk -> STM Bool) ->
TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32]) ->
IO ()
hashCheckingWorker :: forall v a err.
Codebase IO v a
-> (MsgOrError err HistoryCommentDownloaderChunk -> STM Bool)
-> TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> IO ()
hashCheckingWorker Codebase IO v a
codebase MsgOrError err HistoryCommentDownloaderChunk -> STM Bool
send TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
hashesToCheckQ = do
let loop :: IO ()
loop = do
(hashes, closed) <- STM
([(HistoryCommentHash32, [HistoryCommentRevisionHash32])], Bool)
-> IO
([(HistoryCommentHash32, [HistoryCommentRevisionHash32])], Bool)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (Int
-> STM
(Maybe (HistoryCommentHash32, [HistoryCommentRevisionHash32]))
-> STM
([(HistoryCommentHash32, [HistoryCommentRevisionHash32])], Bool)
forall a. Show a => Int -> STM (Maybe a) -> STM ([a], Bool)
fetchChunk Int
insertCommentBatchSize (TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> STM
(Maybe (HistoryCommentHash32, [HistoryCommentRevisionHash32]))
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
hashesToCheckQ))
Debug.debugM Debug.HistoryComments "Checking hashes chunk of size" (length hashes)
when (not (null hashes)) $ do
unknownHashes <- do
Codebase.runTransaction codebase $ do
hashes & foldMapM \(HistoryCommentHash32 Hash32
commentHash, [HistoryCommentRevisionHash32]
revisionHashes) -> do
haveComment <- Hash32 -> Transaction Bool
Q.haveHistoryComment Hash32
commentHash
if haveComment
then do
revisionHashes & wither \(HistoryCommentRevisionHash32 Hash32
revisionHash) -> do
Hash32 -> Transaction Bool
Q.haveHistoryCommentRevision Hash32
revisionHash
Transaction Bool
-> (Bool
-> Maybe
(Either HistoryCommentHash32 HistoryCommentRevisionHash32))
-> Transaction
(Maybe (Either HistoryCommentHash32 HistoryCommentRevisionHash32))
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
Bool
True -> Maybe (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
forall a. Maybe a
Nothing
Bool
False -> Either HistoryCommentHash32 HistoryCommentRevisionHash32
-> Maybe (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
forall a. a -> Maybe a
Just (Either HistoryCommentHash32 HistoryCommentRevisionHash32
-> Maybe
(Either HistoryCommentHash32 HistoryCommentRevisionHash32))
-> Either HistoryCommentHash32 HistoryCommentRevisionHash32
-> Maybe (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
forall a b. (a -> b) -> a -> b
$ HistoryCommentRevisionHash32
-> Either HistoryCommentHash32 HistoryCommentRevisionHash32
forall a b. b -> Either a b
Right (HistoryCommentRevisionHash32
-> Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> HistoryCommentRevisionHash32
-> Either HistoryCommentHash32 HistoryCommentRevisionHash32
forall a b. (a -> b) -> a -> b
$ Hash32 -> HistoryCommentRevisionHash32
HistoryCommentRevisionHash32 (Hash32 -> HistoryCommentRevisionHash32)
-> Hash32 -> HistoryCommentRevisionHash32
forall a b. (a -> b) -> a -> b
$ Hash32
revisionHash
else do
pure (pure (Left $ HistoryCommentHash32 commentHash) <> (Right <$> revisionHashes))
case NESet.nonEmptySet (Set.fromList unknownHashes) of
Maybe
(NESet (Either HistoryCommentHash32 HistoryCommentRevisionHash32))
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just NESet (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
unknownHashesSet -> do
IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> (STM Bool -> IO Bool) -> STM Bool -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM Bool -> IO Bool
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Bool -> IO ()) -> STM Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MsgOrError err HistoryCommentDownloaderChunk -> STM Bool
send (MsgOrError err HistoryCommentDownloaderChunk -> STM Bool)
-> MsgOrError err HistoryCommentDownloaderChunk -> STM Bool
forall a b. (a -> b) -> a -> b
$ HistoryCommentDownloaderChunk
-> MsgOrError err HistoryCommentDownloaderChunk
forall err a. a -> MsgOrError err a
Msg (HistoryCommentDownloaderChunk
-> MsgOrError err HistoryCommentDownloaderChunk)
-> HistoryCommentDownloaderChunk
-> MsgOrError err HistoryCommentDownloaderChunk
forall a b. (a -> b) -> a -> b
$ NESet (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> HistoryCommentDownloaderChunk
RequestCommentsChunk NESet (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
unknownHashesSet
when (not closed) loop
IO ()
loop
IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> (STM Bool -> IO Bool) -> STM Bool -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM Bool -> IO Bool
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Bool -> IO ()) -> STM Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MsgOrError err HistoryCommentDownloaderChunk -> STM Bool
send (MsgOrError err HistoryCommentDownloaderChunk -> STM Bool)
-> MsgOrError err HistoryCommentDownloaderChunk -> STM Bool
forall a b. (a -> b) -> a -> b
$ HistoryCommentDownloaderChunk
-> MsgOrError err HistoryCommentDownloaderChunk
forall err a. a -> MsgOrError err a
Msg (HistoryCommentDownloaderChunk
-> MsgOrError err HistoryCommentDownloaderChunk)
-> HistoryCommentDownloaderChunk
-> MsgOrError err HistoryCommentDownloaderChunk
forall a b. (a -> b) -> a -> b
$ HistoryCommentDownloaderChunk
DoneCheckingHashesChunk
DebugFlag -> String -> IO ()
forall (m :: * -> *). Monad m => DebugFlag -> String -> m ()
Debug.debugLogM DebugFlag
Debug.HistoryComments String
"Hash checking worker finished"
receiverWorker ::
STM (Maybe (MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk)) ->
TMVar Text ->
TBMQueue
( HistoryCommentHash32,
[HistoryCommentRevisionHash32]
) ->
TBMQueue (Either HistoryComment HistoryCommentRevision) ->
IO ()
receiverWorker :: STM
(Maybe
(MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk))
-> TMVar Text
-> TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> TBMQueue (Either HistoryComment HistoryCommentRevision)
-> IO ()
receiverWorker STM
(Maybe
(MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk))
recv TMVar Text
errMVar TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
hashesToCheckQ TBMQueue (Either HistoryComment HistoryCommentRevision)
commentsQ = do
let loop :: IO ()
loop = do
next <- STM (IO ()) -> IO (IO ())
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically do
STM
(Maybe
(MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk))
recv STM
(Maybe
(MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk))
-> (Maybe
(MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk)
-> STM (IO ()))
-> STM (IO ())
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe
(MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk)
Nothing -> do
TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
hashesToCheckQ
TBMQueue (Either HistoryComment HistoryCommentRevision) -> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue (Either HistoryComment HistoryCommentRevision)
commentsQ
IO () -> STM (IO ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
Just (DeserialiseFailure Text
err) -> do
TMVar Text -> Text -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar Text
errMVar (Text -> STM ()) -> Text -> STM ()
forall a b. (a -> b) -> a -> b
$ Text
"downloadHistoryComments: deserialisation failure: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
err
IO () -> STM (IO ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
Just (UserErr DownloadCommentsResponse
err) -> do
TMVar Text -> Text -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar Text
errMVar (Text -> STM ()) -> Text -> STM ()
forall a b. (a -> b) -> a -> b
$ Text
"downloadHistoryComments: server error: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> DownloadCommentsResponse -> Text
forall a. Show a => a -> Text
tShow DownloadCommentsResponse
err
IO () -> STM (IO ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
Just (Msg HistoryCommentUploaderChunk
msg) -> do
case HistoryCommentUploaderChunk
msg of
PossiblyNewHashesChunk NonEmpty (HistoryCommentHash32, [HistoryCommentRevisionHash32])
hashesToCheck -> do
NonEmpty (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> ((HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> STM ())
-> STM ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ NonEmpty (HistoryCommentHash32, [HistoryCommentRevisionHash32])
hashesToCheck (((HistoryCommentHash32, [HistoryCommentRevisionHash32]) -> STM ())
-> STM ())
-> ((HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> STM ())
-> STM ()
forall a b. (a -> b) -> a -> b
$ \(HistoryCommentHash32, [HistoryCommentRevisionHash32])
h -> TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> (HistoryCommentHash32, [HistoryCommentRevisionHash32]) -> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
hashesToCheckQ (HistoryCommentHash32, [HistoryCommentRevisionHash32])
h
HistoryCommentUploaderChunk
DoneSendingHashesChunk -> do
TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
hashesToCheckQ
HistoryCommentChunk HistoryComment
comment -> do
TBMQueue (Either HistoryComment HistoryCommentRevision)
-> Either HistoryComment HistoryCommentRevision -> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue (Either HistoryComment HistoryCommentRevision)
commentsQ (HistoryComment -> Either HistoryComment HistoryCommentRevision
forall a b. a -> Either a b
Left HistoryComment
comment)
HistoryCommentRevisionChunk HistoryCommentRevision
revision -> do
TBMQueue (Either HistoryComment HistoryCommentRevision)
-> Either HistoryComment HistoryCommentRevision -> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue (Either HistoryComment HistoryCommentRevision)
commentsQ (HistoryCommentRevision
-> Either HistoryComment HistoryCommentRevision
forall a b. b -> Either a b
Right HistoryCommentRevision
revision)
IO () -> STM (IO ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure IO ()
loop
next
IO ()
loop
DebugFlag -> String -> IO ()
forall (m :: * -> *). Monad m => DebugFlag -> String -> m ()
Debug.debugLogM DebugFlag
Debug.HistoryComments String
"Receiver worker finished"
insertCommentBatchSize :: Int
insertCommentBatchSize = Int
100