module Unison.Share.HistoryComments
  ( uploadHistoryComments,
    downloadHistoryComments,
  )
where

import Control.Concurrent.STM.TBMQueue (TBMQueue, closeTBMQueue, newTBMQueueIO, readTBMQueue, writeTBMQueue)
import Control.Monad.Reader
import Control.Monad.Trans.Maybe (mapMaybeT)
import Data.List.NonEmpty qualified as NEL
import Data.Monoid (All (..))
import Data.Set qualified as Set
import Data.Set.NonEmpty qualified as NESet
import Data.Text qualified as Text
import Data.Void
import Ki qualified
import Servant.API
import System.IO.Unsafe (unsafePerformIO)
import U.Codebase.Sqlite.Queries qualified as Q
import Unison.Auth.Tokens (newTokenProvider)
import Unison.Cli.Monad
import Unison.Cli.Monad qualified as Cli
import Unison.Codebase qualified as Codebase
import Unison.Debug qualified as Debug
import Unison.Hash (Hash)
import Unison.Hash32 (Hash32)
import Unison.Hash32 qualified as Hash32
import Unison.HashTags
import Unison.HistoryComment qualified as HC
import Unison.KeyThumbprint (KeyThumbprint (KeyThumbprint))
import Unison.Prelude
import Unison.Server.HistoryComments.Types
import Unison.Server.HistoryComments.Types qualified as Share
import Unison.Share.Codeserver qualified as Codeserver
import Unison.Sqlite qualified as Sqlite
import Unison.Sync.Types (RepoInfo)
import Unison.Util.Monoid (foldMapM)
import Unison.Util.Websockets
import UnliftIO.Environment (lookupEnv)
import UnliftIO.STM

-- | Number of comment chunks that can be queued up in the websockets buffer.
msgBufferSize :: Int
msgBufferSize :: Int
msgBufferSize = Int
20

syncHistoryCommentsEnvKey :: String
syncHistoryCommentsEnvKey :: String
syncHistoryCommentsEnvKey = String
"UNISON_SYNC_HISTORY_COMMENTS"

shouldSyncHistoryComments :: Bool
shouldSyncHistoryComments :: Bool
shouldSyncHistoryComments = IO Bool -> Bool
forall a. IO a -> a
unsafePerformIO (IO Bool -> Bool) -> IO Bool -> Bool
forall a b. (a -> b) -> a -> b
$ do
  String -> IO (Maybe String)
forall (m :: * -> *). MonadIO m => String -> m (Maybe String)
lookupEnv String
syncHistoryCommentsEnvKey IO (Maybe String) -> (Maybe String -> Bool) -> IO Bool
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
    Just String
"true" -> Bool
True
    Maybe String
_ -> Bool
False
{-# NOINLINE shouldSyncHistoryComments #-}

uploadHistoryComments ::
  -- | The local branch causal to upload comments for.
  Hash32 ->
  -- | The Unison Share URL.
  Codeserver.CodeserverURI ->
  -- | The remote branch to upload for.
  RepoInfo ->
  Cli ()
uploadHistoryComments :: Hash32 -> CodeserverURI -> RepoInfo -> Cli ()
uploadHistoryComments Hash32
rootCausalHash32 CodeserverURI
codeserver RepoInfo
repoInfo
  | Bool -> Bool
not Bool
shouldSyncHistoryComments = () -> Cli ()
forall a. a -> Cli a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
  | Bool
otherwise = do
      Cli.Env {Codebase IO Symbol Ann
codebase :: Codebase IO Symbol Ann
$sel:codebase:Env :: Env -> Codebase IO Symbol Ann
codebase, CredentialManager
credentialManager :: CredentialManager
$sel:credentialManager:Env :: Env -> CredentialManager
credentialManager} <- Cli Env
forall r (m :: * -> *). MonadReader r m => m r
ask
      let path :: String
path = String
"/ucm/v1/history-comments/upload?branchRef=" String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Text -> String
Text.unpack (RepoInfo -> Text
forall a. ToHttpApiData a => a -> Text
toQueryParam RepoInfo
repoInfo)
      -- Enable compression
      let tokenProvider :: TokenProvider
tokenProvider = CredentialManager -> TokenProvider
newTokenProvider CredentialManager
credentialManager
      Either
  ConnectionException
  ((),
   [MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk])
result <- IO
  (Either
     ConnectionException
     ((),
      [MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk]))
-> Cli
     (Either
        ConnectionException
        ((),
         [MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk]))
forall a. IO a -> Cli a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO
   (Either
      ConnectionException
      ((),
       [MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk]))
 -> Cli
      (Either
         ConnectionException
         ((),
          [MsgOrError
             UploadCommentsResponse HistoryCommentDownloaderChunk])))
-> IO
     (Either
        ConnectionException
        ((),
         [MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk]))
-> Cli
     (Either
        ConnectionException
        ((),
         [MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk]))
forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) i o r e.
(MonadUnliftIO m, WebSocketsData i, WebSocketsData o) =>
Int
-> CodeserverURI
-> (CodeserverId -> IO (Either e Text))
-> String
-> (Queues i o -> m r)
-> m (Either ConnectionException (r, [o]))
withCodeserverWebsocket @IO @(MsgOrError Void HistoryCommentUploaderChunk) @(MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk) Int
msgBufferSize CodeserverURI
codeserver TokenProvider
tokenProvider String
path \Queues {MsgOrError Void HistoryCommentUploaderChunk -> STM Bool
send :: MsgOrError Void HistoryCommentUploaderChunk -> STM Bool
$sel:send:Queues :: forall i o. Queues i o -> i -> STM Bool
send, STM
  (Maybe
     (MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk))
receive :: STM
  (Maybe
     (MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk))
$sel:receive:Queues :: forall i o. Queues i o -> STM (Maybe o)
receive} -> (Scope -> IO ()) -> IO ()
forall a. (Scope -> IO a) -> IO a
Ki.scoped \Scope
scope -> do
        TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
commentHashesToSendQ <- forall a. Int -> IO (TBMQueue a)
newTBMQueueIO @(HistoryCommentHash32, [HistoryCommentRevisionHash32]) Int
100
        TBMQueue (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
commentHashesToUploadQ <- forall a. Int -> IO (TBMQueue a)
newTBMQueueIO @(Either HistoryCommentHash32 HistoryCommentRevisionHash32) Int
100
        TMVar Text
errMVar <- IO (TMVar Text)
forall (m :: * -> *) a. MonadIO m => m (TMVar a)
newEmptyTMVarIO
        Thread ()
_ <- Scope -> IO () -> IO (Thread ())
forall a. Scope -> IO a -> IO (Thread a)
Ki.fork Scope
scope ((MsgOrError Void HistoryCommentUploaderChunk -> STM Bool)
-> TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> IO ()
hashNotifyWorker MsgOrError Void HistoryCommentUploaderChunk -> STM Bool
send TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
commentHashesToSendQ)
        Thread ()
uploaderThread <- Scope -> IO () -> IO (Thread ())
forall a. Scope -> IO a -> IO (Thread a)
Ki.fork Scope
scope (Codebase IO Symbol Ann
-> (MsgOrError Void HistoryCommentUploaderChunk -> STM Bool)
-> TBMQueue
     (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> IO ()
forall v a err.
Codebase IO v a
-> (MsgOrError err HistoryCommentUploaderChunk -> STM Bool)
-> TBMQueue
     (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> IO ()
uploaderWorker Codebase IO Symbol Ann
codebase MsgOrError Void HistoryCommentUploaderChunk -> STM Bool
send TBMQueue (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
commentHashesToUploadQ)
        Thread ()
_ <- Scope -> IO () -> IO (Thread ())
forall a. Scope -> IO a -> IO (Thread a)
Ki.fork Scope
scope (STM
  (Maybe
     (MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk))
-> TBMQueue
     (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> TMVar Text
-> IO ()
receiverWorker STM
  (Maybe
     (MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk))
receive TBMQueue (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
commentHashesToUploadQ TMVar Text
errMVar)
        Codebase IO Symbol Ann -> Transaction () -> IO ()
forall (m :: * -> *) v a b.
MonadIO m =>
Codebase m v a -> Transaction b -> m b
Codebase.runTransaction Codebase IO Symbol Ann
codebase (Transaction () -> IO ()) -> Transaction () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
          CausalHashId
rootCausalHashId <- CausalHash -> Transaction CausalHashId
Q.expectCausalHashIdByCausalHash (CausalHash -> Transaction CausalHashId)
-> CausalHash -> Transaction CausalHashId
forall a b. (a -> b) -> a -> b
$ Hash -> CausalHash
CausalHash (Hash -> CausalHash) -> Hash -> CausalHash
forall a b. (a -> b) -> a -> b
$ Hash32 -> Hash
Hash32.toHash Hash32
rootCausalHash32
          CausalHashId
-> (Transaction (Maybe (HistoryCommentId, Hash32))
    -> Transaction ())
-> Transaction ()
forall r.
CausalHashId
-> (Transaction (Maybe (HistoryCommentId, Hash32))
    -> Transaction r)
-> Transaction r
Q.streamHistoryCommentsForCausal CausalHashId
rootCausalHashId \Transaction (Maybe (HistoryCommentId, Hash32))
getCommentIds -> do
            let loop :: Transaction ()
loop = do
                  Maybe ()
result <- MaybeT Transaction () -> Transaction (Maybe ())
forall (m :: * -> *) a. MaybeT m a -> m (Maybe a)
runMaybeT (MaybeT Transaction () -> Transaction (Maybe ()))
-> MaybeT Transaction () -> Transaction (Maybe ())
forall a b. (a -> b) -> a -> b
$ do
                    (HistoryCommentId
commentId, Hash32
commentHash32) <- Transaction (Maybe (HistoryCommentId, Hash32))
-> MaybeT Transaction (HistoryCommentId, Hash32)
forall (m :: * -> *) a. m (Maybe a) -> MaybeT m a
MaybeT (Transaction (Maybe (HistoryCommentId, Hash32))
 -> MaybeT Transaction (HistoryCommentId, Hash32))
-> Transaction (Maybe (HistoryCommentId, Hash32))
-> MaybeT Transaction (HistoryCommentId, Hash32)
forall a b. (a -> b) -> a -> b
$ Transaction (Maybe (HistoryCommentId, Hash32))
getCommentIds
                    [Hash32]
revisionHashes <- Transaction [Hash32] -> MaybeT Transaction [Hash32]
forall (m :: * -> *) a. Monad m => m a -> MaybeT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Transaction [Hash32] -> MaybeT Transaction [Hash32])
-> Transaction [Hash32] -> MaybeT Transaction [Hash32]
forall a b. (a -> b) -> a -> b
$ HistoryCommentId -> Transaction [Hash32]
Q.commentRevisionHashes HistoryCommentId
commentId
                    DebugFlag -> String -> Hash32 -> MaybeT Transaction ()
forall a (m :: * -> *).
(Show a, Monad m) =>
DebugFlag -> String -> a -> m ()
Debug.debugM DebugFlag
Debug.HistoryComments String
"Queueing comment for checking" Hash32
commentHash32
                    Transaction () -> MaybeT Transaction ()
forall (m :: * -> *) a. Monad m => m a -> MaybeT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Transaction () -> MaybeT Transaction ())
-> (IO () -> Transaction ()) -> IO () -> MaybeT Transaction ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> Transaction ()
forall a. HasCallStack => IO a -> Transaction a
Sqlite.unsafeIO (IO () -> MaybeT Transaction ()) -> IO () -> MaybeT Transaction ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> (HistoryCommentHash32, [HistoryCommentRevisionHash32]) -> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
commentHashesToSendQ (Hash32 -> HistoryCommentHash32
HistoryCommentHash32 Hash32
commentHash32, Hash32 -> HistoryCommentRevisionHash32
HistoryCommentRevisionHash32 (Hash32 -> HistoryCommentRevisionHash32)
-> [Hash32] -> [HistoryCommentRevisionHash32]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Hash32]
revisionHashes)
                  -- Loop till a send fails or we run out of comments
                  case Maybe ()
result of
                    Just () -> Transaction ()
loop
                    Maybe ()
Nothing -> () -> Transaction ()
forall a. a -> Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
            Transaction ()
loop
        -- Close the hashes queue to signal we don't have any more, then wait for the notifier to finish
        STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
commentHashesToSendQ
        -- Now we just have to wait for the uploader to finish sending all the comments we have queued up.
        -- Once we've uploaded everything we can safely exit and the connection will be closed.
        DebugFlag -> String -> IO ()
forall (m :: * -> *). Monad m => DebugFlag -> String -> m ()
Debug.debugLogM DebugFlag
Debug.HistoryComments String
"Uploading history comments: waiting for uploader thread to finish"
        STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ Thread () -> STM ()
forall a. Thread a -> STM a
Ki.await Thread ()
uploaderThread
        DebugFlag -> String -> IO ()
forall (m :: * -> *). Monad m => DebugFlag -> String -> m ()
Debug.debugLogM DebugFlag
Debug.HistoryComments String
"Done; closing connection"
      case Either
  ConnectionException
  ((),
   [MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk])
result of
        Left ConnectionException
err -> String -> Cli ()
forall a. HasCallStack => String -> a
error (String -> Cli ()) -> String -> Cli ()
forall a b. (a -> b) -> a -> b
$ String
"uploadCommentsClient: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> ConnectionException -> String
forall a. Show a => a -> String
show ConnectionException
err
        Right ((), [MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk]
_leftovers {- Messages sent by server after we finished. -}) -> () -> Cli ()
forall a. a -> Cli a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
  where
    -- Read all available values from a TBMQueue, returning them and whether the queue is closed.
    flushTBMQueue :: TBMQueue a -> STM ([a], Bool)
    flushTBMQueue :: forall a. TBMQueue a -> STM ([a], Bool)
flushTBMQueue TBMQueue a
q = do
      STM (Maybe a) -> STM (Maybe (Maybe a))
forall (f :: * -> *) a. Alternative f => f a -> f (Maybe a)
optional (TBMQueue a -> STM (Maybe a)
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue a
q) STM (Maybe (Maybe a))
-> (Maybe (Maybe a) -> STM ([a], Bool)) -> STM ([a], Bool)
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        -- No values available
        Maybe (Maybe a)
Nothing -> STM ([a], Bool)
forall a. STM a
forall (f :: * -> *) a. Alternative f => f a
empty
        Just Maybe a
Nothing -> do
          -- Queue closed
          ([a], Bool) -> STM ([a], Bool)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([], Bool
True)
        Just (Just a
v) -> do
          ([a]
vs, Bool
closed) <- TBMQueue a -> STM ([a], Bool)
forall a. TBMQueue a -> STM ([a], Bool)
flushTBMQueue TBMQueue a
q STM ([a], Bool) -> STM ([a], Bool) -> STM ([a], Bool)
forall a. STM a -> STM a -> STM a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ([a], Bool) -> STM ([a], Bool)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([], Bool
False)
          ([a], Bool) -> STM ([a], Bool)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a
v a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
vs, Bool
closed)
    uploaderWorker ::
      Codebase.Codebase IO v a ->
      ( MsgOrError err HistoryCommentUploaderChunk ->
        STM Bool
      ) ->
      TBMQueue (Either HistoryCommentHash32 HistoryCommentRevisionHash32) ->
      IO ()
    uploaderWorker :: forall v a err.
Codebase IO v a
-> (MsgOrError err HistoryCommentUploaderChunk -> STM Bool)
-> TBMQueue
     (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> IO ()
uploaderWorker Codebase IO v a
codebase MsgOrError err HistoryCommentUploaderChunk -> STM Bool
send TBMQueue (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
commentHashesToUploadQ = do
      let loop :: MaybeT IO Any
loop = do
            Either HistoryCommentHash32 HistoryCommentRevisionHash32
hash <- IO
  (Maybe (Either HistoryCommentHash32 HistoryCommentRevisionHash32))
-> MaybeT
     IO (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
forall (m :: * -> *) a. m (Maybe a) -> MaybeT m a
MaybeT (IO
   (Maybe (Either HistoryCommentHash32 HistoryCommentRevisionHash32))
 -> MaybeT
      IO (Either HistoryCommentHash32 HistoryCommentRevisionHash32))
-> IO
     (Maybe (Either HistoryCommentHash32 HistoryCommentRevisionHash32))
-> MaybeT
     IO (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
forall a b. (a -> b) -> a -> b
$ STM
  (Maybe (Either HistoryCommentHash32 HistoryCommentRevisionHash32))
-> IO
     (Maybe (Either HistoryCommentHash32 HistoryCommentRevisionHash32))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TBMQueue (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> STM
     (Maybe (Either HistoryCommentHash32 HistoryCommentRevisionHash32))
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
commentHashesToUploadQ)
            (Transaction (Maybe ()) -> IO (Maybe ()))
-> MaybeT Transaction () -> MaybeT IO ()
forall (m :: * -> *) a (n :: * -> *) b.
(m (Maybe a) -> n (Maybe b)) -> MaybeT m a -> MaybeT n b
mapMaybeT (Codebase IO v a -> Transaction (Maybe ()) -> IO (Maybe ())
forall (m :: * -> *) v a b.
MonadIO m =>
Codebase m v a -> Transaction b -> m b
Codebase.runTransaction Codebase IO v a
codebase) (MaybeT Transaction () -> MaybeT IO ())
-> MaybeT Transaction () -> MaybeT IO ()
forall a b. (a -> b) -> a -> b
$ do
              case Either HistoryCommentHash32 HistoryCommentRevisionHash32
hash of
                Left (HistoryCommentHash32 Hash32
commentHash) -> do
                  DebugFlag -> String -> Hash32 -> MaybeT Transaction ()
forall a (m :: * -> *).
(Show a, Monad m) =>
DebugFlag -> String -> a -> m ()
Debug.debugM DebugFlag
Debug.HistoryComments String
"Uploading comment for hash" Hash32
commentHash
                  HistoryCommentId
commentId <- Transaction HistoryCommentId -> MaybeT Transaction HistoryCommentId
forall (m :: * -> *) a. Monad m => m a -> MaybeT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Transaction HistoryCommentId
 -> MaybeT Transaction HistoryCommentId)
-> Transaction HistoryCommentId
-> MaybeT Transaction HistoryCommentId
forall a b. (a -> b) -> a -> b
$ Hash32 -> Transaction HistoryCommentId
Q.expectHistoryCommentIdByHash32 Hash32
commentHash
                  HistoryComment UTCTime KeyThumbprint Hash32 Hash32
comment <- Transaction (HistoryComment UTCTime KeyThumbprint Hash32 Hash32)
-> MaybeT
     Transaction (HistoryComment UTCTime KeyThumbprint Hash32 Hash32)
forall (m :: * -> *) a. Monad m => m a -> MaybeT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Transaction (HistoryComment UTCTime KeyThumbprint Hash32 Hash32)
 -> MaybeT
      Transaction (HistoryComment UTCTime KeyThumbprint Hash32 Hash32))
-> Transaction (HistoryComment UTCTime KeyThumbprint Hash32 Hash32)
-> MaybeT
     Transaction (HistoryComment UTCTime KeyThumbprint Hash32 Hash32)
forall a b. (a -> b) -> a -> b
$ HistoryCommentId
-> Transaction (HistoryComment UTCTime KeyThumbprint Hash32 Hash32)
Q.expectHistoryCommentById HistoryCommentId
commentId
                  Bool
success <- Transaction Bool -> MaybeT Transaction Bool
forall (m :: * -> *) a. Monad m => m a -> MaybeT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Transaction Bool -> MaybeT Transaction Bool)
-> Transaction Bool -> MaybeT Transaction Bool
forall a b. (a -> b) -> a -> b
$ IO Bool -> Transaction Bool
forall a. HasCallStack => IO a -> Transaction a
Sqlite.unsafeIO (IO Bool -> Transaction Bool) -> IO Bool -> Transaction Bool
forall a b. (a -> b) -> a -> b
$ STM Bool -> IO Bool
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ MsgOrError err HistoryCommentUploaderChunk -> STM Bool
send (HistoryCommentUploaderChunk
-> MsgOrError err HistoryCommentUploaderChunk
forall err a. a -> MsgOrError err a
Msg (HistoryCommentUploaderChunk
 -> MsgOrError err HistoryCommentUploaderChunk)
-> HistoryCommentUploaderChunk
-> MsgOrError err HistoryCommentUploaderChunk
forall a b. (a -> b) -> a -> b
$ Either
  (HistoryComment UTCTime KeyThumbprint Hash32 Hash32)
  (HistoryCommentRevision Hash32 UTCTime Hash32)
-> HistoryCommentUploaderChunk
intoChunk (HistoryComment UTCTime KeyThumbprint Hash32 Hash32
-> Either
     (HistoryComment UTCTime KeyThumbprint Hash32 Hash32)
     (HistoryCommentRevision Hash32 UTCTime Hash32)
forall a b. a -> Either a b
Left HistoryComment UTCTime KeyThumbprint Hash32 Hash32
comment))
                  Bool -> MaybeT Transaction () -> MaybeT Transaction ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
success) (MaybeT Transaction () -> MaybeT Transaction ())
-> MaybeT Transaction () -> MaybeT Transaction ()
forall a b. (a -> b) -> a -> b
$ DebugFlag -> String -> MaybeT Transaction ()
forall (m :: * -> *). Monad m => DebugFlag -> String -> m ()
Debug.debugLogM DebugFlag
Debug.HistoryComments String
"Failed to send the history comment, shutting down"
                  Bool -> MaybeT Transaction ()
forall (f :: * -> *). Alternative f => Bool -> f ()
guard Bool
success
                Right (HistoryCommentRevisionHash32 Hash32
revisionHash) -> do
                  HistoryCommentRevisionId
revisionId <- Transaction HistoryCommentRevisionId
-> MaybeT Transaction HistoryCommentRevisionId
forall (m :: * -> *) a. Monad m => m a -> MaybeT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Transaction HistoryCommentRevisionId
 -> MaybeT Transaction HistoryCommentRevisionId)
-> Transaction HistoryCommentRevisionId
-> MaybeT Transaction HistoryCommentRevisionId
forall a b. (a -> b) -> a -> b
$ Hash32 -> Transaction HistoryCommentRevisionId
Q.expectHistoryCommentRevisionIdByHash32 Hash32
revisionHash
                  HistoryCommentRevision Hash32 UTCTime Hash32
revision <- Transaction (HistoryCommentRevision Hash32 UTCTime Hash32)
-> MaybeT
     Transaction (HistoryCommentRevision Hash32 UTCTime Hash32)
forall (m :: * -> *) a. Monad m => m a -> MaybeT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Transaction (HistoryCommentRevision Hash32 UTCTime Hash32)
 -> MaybeT
      Transaction (HistoryCommentRevision Hash32 UTCTime Hash32))
-> Transaction (HistoryCommentRevision Hash32 UTCTime Hash32)
-> MaybeT
     Transaction (HistoryCommentRevision Hash32 UTCTime Hash32)
forall a b. (a -> b) -> a -> b
$ HistoryCommentRevisionId
-> Transaction (HistoryCommentRevision Hash32 UTCTime Hash32)
Q.expectHistoryCommentRevisionById HistoryCommentRevisionId
revisionId
                  Bool
success <- Transaction Bool -> MaybeT Transaction Bool
forall (m :: * -> *) a. Monad m => m a -> MaybeT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Transaction Bool -> MaybeT Transaction Bool)
-> Transaction Bool -> MaybeT Transaction Bool
forall a b. (a -> b) -> a -> b
$ IO Bool -> Transaction Bool
forall a. HasCallStack => IO a -> Transaction a
Sqlite.unsafeIO (IO Bool -> Transaction Bool) -> IO Bool -> Transaction Bool
forall a b. (a -> b) -> a -> b
$ STM Bool -> IO Bool
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ MsgOrError err HistoryCommentUploaderChunk -> STM Bool
send (HistoryCommentUploaderChunk
-> MsgOrError err HistoryCommentUploaderChunk
forall err a. a -> MsgOrError err a
Msg (HistoryCommentUploaderChunk
 -> MsgOrError err HistoryCommentUploaderChunk)
-> HistoryCommentUploaderChunk
-> MsgOrError err HistoryCommentUploaderChunk
forall a b. (a -> b) -> a -> b
$ Either
  (HistoryComment UTCTime KeyThumbprint Hash32 Hash32)
  (HistoryCommentRevision Hash32 UTCTime Hash32)
-> HistoryCommentUploaderChunk
intoChunk (HistoryCommentRevision Hash32 UTCTime Hash32
-> Either
     (HistoryComment UTCTime KeyThumbprint Hash32 Hash32)
     (HistoryCommentRevision Hash32 UTCTime Hash32)
forall a b. b -> Either a b
Right HistoryCommentRevision Hash32 UTCTime Hash32
revision))
                  Bool -> MaybeT Transaction () -> MaybeT Transaction ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
success) (MaybeT Transaction () -> MaybeT Transaction ())
-> MaybeT Transaction () -> MaybeT Transaction ()
forall a b. (a -> b) -> a -> b
$ DebugFlag -> String -> MaybeT Transaction ()
forall (m :: * -> *). Monad m => DebugFlag -> String -> m ()
Debug.debugLogM DebugFlag
Debug.HistoryComments String
"Failed to send history comment revision, shutting down"
                  Bool -> MaybeT Transaction ()
forall (f :: * -> *). Alternative f => Bool -> f ()
guard Bool
success
            MaybeT IO Any
loop
      IO (Maybe Any) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe Any) -> IO ())
-> (MaybeT IO Any -> IO (Maybe Any)) -> MaybeT IO Any -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MaybeT IO Any -> IO (Maybe Any)
forall (m :: * -> *) a. MaybeT m a -> m (Maybe a)
runMaybeT (MaybeT IO Any -> IO ()) -> MaybeT IO Any -> IO ()
forall a b. (a -> b) -> a -> b
$ MaybeT IO Any
loop

    receiverWorker ::
      STM (Maybe (MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk)) ->
      TBMQueue
        ( Either
            HistoryCommentHash32
            HistoryCommentRevisionHash32
        ) ->
      TMVar Text ->
      IO ()
    receiverWorker :: STM
  (Maybe
     (MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk))
-> TBMQueue
     (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> TMVar Text
-> IO ()
receiverWorker STM
  (Maybe
     (MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk))
receive TBMQueue (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
commentHashesToUploadQ TMVar Text
errMVar = do
      let loop :: IO ()
loop = do
            Maybe
  (MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk)
msgOrError <- STM
  (Maybe
     (MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk))
-> IO
     (Maybe
        (MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically STM
  (Maybe
     (MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk))
receive
            case Maybe
  (MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk)
msgOrError of
              -- Channel closed, shut down
              Maybe
  (MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk)
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
              Just (Msg HistoryCommentDownloaderChunk
msg) -> case HistoryCommentDownloaderChunk
msg of
                HistoryCommentDownloaderChunk
DoneCheckingHashesChunk -> do
                  -- Notify that the server is done requesting comments
                  STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBMQueue (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
commentHashesToUploadQ
                  IO ()
loop
                RequestCommentsChunk NESet (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
comments -> do
                  STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ NESet (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> (Either HistoryCommentHash32 HistoryCommentRevisionHash32
    -> STM ())
-> STM ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ NESet (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
comments ((Either HistoryCommentHash32 HistoryCommentRevisionHash32
  -> STM ())
 -> STM ())
-> (Either HistoryCommentHash32 HistoryCommentRevisionHash32
    -> STM ())
-> STM ()
forall a b. (a -> b) -> a -> b
$ TBMQueue (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> Either HistoryCommentHash32 HistoryCommentRevisionHash32
-> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
commentHashesToUploadQ
                  IO ()
loop
              Just (DeserialiseFailure Text
msg) -> do
                STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar Text -> Text -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar Text
errMVar (Text -> STM ()) -> Text -> STM ()
forall a b. (a -> b) -> a -> b
$ Text
"uploadHistoryComments: deserialisation failure: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
msg
              Just (UserErr UploadCommentsResponse
err) -> do
                STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar Text -> Text -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar Text
errMVar (Text -> STM ()) -> Text -> STM ()
forall a b. (a -> b) -> a -> b
$ Text
"uploadHistoryComments: server error: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> UploadCommentsResponse -> Text
forall a. Show a => a -> Text
tShow UploadCommentsResponse
err
      IO ()
loop

    hashNotifyWorker :: (MsgOrError Void HistoryCommentUploaderChunk -> STM Bool) -> TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32]) -> IO ()
    hashNotifyWorker :: (MsgOrError Void HistoryCommentUploaderChunk -> STM Bool)
-> TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> IO ()
hashNotifyWorker MsgOrError Void HistoryCommentUploaderChunk -> STM Bool
send TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
q = do
      let loop :: IO ()
loop = do
            Bool
isClosed <- STM Bool -> IO Bool
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ do
              ([(HistoryCommentHash32, [HistoryCommentRevisionHash32])]
hashesToCheck, Bool
isClosed) <- TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> STM
     ([(HistoryCommentHash32, [HistoryCommentRevisionHash32])], Bool)
forall a. TBMQueue a -> STM ([a], Bool)
flushTBMQueue TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
q
              All Bool
sentSuccessfully <-
                [(HistoryCommentHash32, [HistoryCommentRevisionHash32])]
-> Maybe
     (NonEmpty (HistoryCommentHash32, [HistoryCommentRevisionHash32]))
forall a. [a] -> Maybe (NonEmpty a)
NEL.nonEmpty [(HistoryCommentHash32, [HistoryCommentRevisionHash32])]
hashesToCheck Maybe
  (NonEmpty (HistoryCommentHash32, [HistoryCommentRevisionHash32]))
-> (Maybe
      (NonEmpty (HistoryCommentHash32, [HistoryCommentRevisionHash32]))
    -> STM All)
-> STM All
forall a b. a -> (a -> b) -> b
& (NonEmpty (HistoryCommentHash32, [HistoryCommentRevisionHash32])
 -> STM All)
-> Maybe
     (NonEmpty (HistoryCommentHash32, [HistoryCommentRevisionHash32]))
-> STM All
forall (m :: * -> *) (f :: * -> *) b a.
(Monad m, Foldable f, Monoid b) =>
(a -> m b) -> f a -> m b
foldMapM \NonEmpty (HistoryCommentHash32, [HistoryCommentRevisionHash32])
possiblyNewHashes -> do
                  DebugFlag
-> String
-> NonEmpty (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> STM ()
forall a (m :: * -> *).
(Show a, Monad m) =>
DebugFlag -> String -> a -> m ()
Debug.debugM DebugFlag
Debug.HistoryComments String
"Sending possibly new hashes:" NonEmpty (HistoryCommentHash32, [HistoryCommentRevisionHash32])
possiblyNewHashes
                  Bool -> All
All (Bool -> All) -> STM Bool -> STM All
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (MsgOrError Void HistoryCommentUploaderChunk -> STM Bool
send (MsgOrError Void HistoryCommentUploaderChunk -> STM Bool)
-> MsgOrError Void HistoryCommentUploaderChunk -> STM Bool
forall a b. (a -> b) -> a -> b
$ HistoryCommentUploaderChunk
-> MsgOrError Void HistoryCommentUploaderChunk
forall err a. a -> MsgOrError err a
Msg (HistoryCommentUploaderChunk
 -> MsgOrError Void HistoryCommentUploaderChunk)
-> HistoryCommentUploaderChunk
-> MsgOrError Void HistoryCommentUploaderChunk
forall a b. (a -> b) -> a -> b
$ NonEmpty (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> HistoryCommentUploaderChunk
PossiblyNewHashesChunk NonEmpty (HistoryCommentHash32, [HistoryCommentRevisionHash32])
possiblyNewHashes)
              Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool
isClosed Bool -> Bool -> Bool
|| Bool -> Bool
not Bool
sentSuccessfully) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$
                DebugFlag -> String -> STM ()
forall (m :: * -> *). Monad m => DebugFlag -> String -> m ()
Debug.debugLogM DebugFlag
Debug.HistoryComments String
"Hash notify worker: queue closed or server closed connection, no longer sending hashes"
              Bool -> STM Bool
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
isClosed Bool -> Bool -> Bool
|| Bool -> Bool
not Bool
sentSuccessfully)
            if Bool
isClosed
              then do
                -- If the queue is closed, send a DoneCheckingHashesChunk to notify the server we're done.
                IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> (STM Bool -> IO Bool) -> STM Bool -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM Bool -> IO Bool
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Bool -> IO ()) -> STM Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MsgOrError Void HistoryCommentUploaderChunk -> STM Bool
send (HistoryCommentUploaderChunk
-> MsgOrError Void HistoryCommentUploaderChunk
forall err a. a -> MsgOrError err a
Msg HistoryCommentUploaderChunk
DoneSendingHashesChunk)
              else IO ()
loop
      IO ()
loop
    intoChunk :: Either
  (HistoryComment UTCTime KeyThumbprint Hash32 Hash32)
  (HistoryCommentRevision Hash32 UTCTime Hash32)
-> HistoryCommentUploaderChunk
intoChunk = \case
      Left
        ( HC.HistoryComment
            { Text
author :: Text
$sel:author:HistoryComment :: forall createdAt thumbprint causal commentId.
HistoryComment createdAt thumbprint causal commentId -> Text
author,
              UTCTime
createdAt :: UTCTime
$sel:createdAt:HistoryComment :: forall createdAt thumbprint causal commentId.
HistoryComment createdAt thumbprint causal commentId -> createdAt
createdAt,
              $sel:authorThumbprint:HistoryComment :: forall createdAt thumbprint causal commentId.
HistoryComment createdAt thumbprint causal commentId -> thumbprint
authorThumbprint = KeyThumbprint Text
authorThumbprint,
              Hash32
causal :: Hash32
$sel:causal:HistoryComment :: forall createdAt thumbprint causal commentId.
HistoryComment createdAt thumbprint causal commentId -> causal
causal,
              $sel:commentId:HistoryComment :: forall createdAt thumbprint causal commentId.
HistoryComment createdAt thumbprint causal commentId -> commentId
commentId = Hash32
commentHash
            }
          ) ->
          HistoryComment -> HistoryCommentUploaderChunk
HistoryCommentChunk
            Share.HistoryComment
              { Text
author :: Text
$sel:author:HistoryComment :: Text
author,
                UTCTime
createdAt :: UTCTime
$sel:createdAt:HistoryComment :: UTCTime
createdAt,
                Text
authorThumbprint :: Text
$sel:authorThumbprint:HistoryComment :: Text
authorThumbprint,
                $sel:causalHash:HistoryComment :: Hash32
Share.causalHash = Hash32
causal,
                Hash32
commentHash :: Hash32
$sel:commentHash:HistoryComment :: Hash32
commentHash
              }
      Right
        ( HC.HistoryCommentRevision
            { Text
subject :: Text
$sel:subject:HistoryCommentRevision :: forall revisionId createdAt comment.
HistoryCommentRevision revisionId createdAt comment -> Text
subject,
              Text
content :: Text
$sel:content:HistoryCommentRevision :: forall revisionId createdAt comment.
HistoryCommentRevision revisionId createdAt comment -> Text
content,
              UTCTime
createdAt :: UTCTime
$sel:createdAt:HistoryCommentRevision :: forall revisionId createdAt comment.
HistoryCommentRevision revisionId createdAt comment -> createdAt
createdAt,
              $sel:comment:HistoryCommentRevision :: forall revisionId createdAt comment.
HistoryCommentRevision revisionId createdAt comment -> comment
comment = Hash32
commentHash,
              Bool
isHidden :: Bool
$sel:isHidden:HistoryCommentRevision :: forall revisionId createdAt comment.
HistoryCommentRevision revisionId createdAt comment -> Bool
isHidden,
              ByteString
authorSignature :: ByteString
$sel:authorSignature:HistoryCommentRevision :: forall revisionId createdAt comment.
HistoryCommentRevision revisionId createdAt comment -> ByteString
authorSignature,
              Hash32
revisionId :: Hash32
$sel:revisionId:HistoryCommentRevision :: forall revisionId createdAt comment.
HistoryCommentRevision revisionId createdAt comment -> revisionId
revisionId
            }
          ) ->
          HistoryCommentRevision -> HistoryCommentUploaderChunk
HistoryCommentRevisionChunk
            Share.HistoryCommentRevision
              { Text
subject :: Text
$sel:subject:HistoryCommentRevision :: Text
subject,
                Text
content :: Text
$sel:content:HistoryCommentRevision :: Text
content,
                UTCTime
createdAt :: UTCTime
$sel:createdAt:HistoryCommentRevision :: UTCTime
createdAt,
                Bool
isHidden :: Bool
$sel:isHidden:HistoryCommentRevision :: Bool
isHidden,
                ByteString
authorSignature :: ByteString
$sel:authorSignature:HistoryCommentRevision :: ByteString
authorSignature,
                $sel:revisionHash:HistoryCommentRevision :: Hash32
revisionHash = Hash32
revisionId,
                Hash32
commentHash :: Hash32
$sel:commentHash:HistoryCommentRevision :: Hash32
commentHash
              }

-- Re-run the given STM action at most n times, collecting the results into a list.
-- If the action returns Nothing, stop and return what has been collected so far, along with a Bool indicating whether the action was exhausted.
fetchChunk :: (Show a) => Int -> STM (Maybe a) -> STM ([a], Bool)
fetchChunk :: forall a. Show a => Int -> STM (Maybe a) -> STM ([a], Bool)
fetchChunk Int
size STM (Maybe a)
action = do
  let go :: Int -> STM ([a], Bool)
go Int
0 = ([a], Bool) -> STM ([a], Bool)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([], Bool
False)
      go Int
n = do
        STM (Maybe a) -> STM (Maybe (Maybe a))
forall (f :: * -> *) a. Alternative f => f a -> f (Maybe a)
optional STM (Maybe a)
action STM (Maybe (Maybe a))
-> (Maybe (Maybe a) -> STM ([a], Bool)) -> STM ([a], Bool)
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
          Maybe (Maybe a)
Nothing -> do
            -- No more values available at the moment
            STM ([a], Bool)
forall a. STM a
forall (f :: * -> *) a. Alternative f => f a
empty
          Just Maybe a
Nothing -> do
            -- Queue is closed
            ([a], Bool) -> STM ([a], Bool)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([], Bool
True)
          Just (Just a
val) -> do
            DebugFlag -> String -> a -> STM ()
forall a (m :: * -> *).
(Show a, Monad m) =>
DebugFlag -> String -> a -> m ()
Debug.debugM DebugFlag
Debug.HistoryComments String
"Fetched value from queue" a
val
            ([a]
rest, Bool
exhausted) <- Int -> STM ([a], Bool)
go (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) STM ([a], Bool) -> STM ([a], Bool) -> STM ([a], Bool)
forall a. STM a -> STM a -> STM a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ([a], Bool) -> STM ([a], Bool)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([], Bool
False)
            ([a], Bool) -> STM ([a], Bool)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a
val a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
rest, Bool
exhausted)
  Int -> STM ([a], Bool)
go Int
size

downloadHistoryComments ::
  -- | The Unison Share URL.
  Codeserver.CodeserverURI ->
  -- | The remote branch to upload for.
  RepoInfo ->
  Cli ()
downloadHistoryComments :: CodeserverURI -> RepoInfo -> Cli ()
downloadHistoryComments CodeserverURI
codeserver RepoInfo
repoInfo
  | Bool -> Bool
not Bool
shouldSyncHistoryComments = () -> Cli ()
forall a. a -> Cli a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
  | Bool
otherwise = do
      Cli.Env {Codebase IO Symbol Ann
$sel:codebase:Env :: Env -> Codebase IO Symbol Ann
codebase :: Codebase IO Symbol Ann
codebase, CredentialManager
$sel:credentialManager:Env :: Env -> CredentialManager
credentialManager :: CredentialManager
credentialManager} <- Cli Env
forall r (m :: * -> *). MonadReader r m => m r
ask
      let path :: String
path = String
"/ucm/v1/history-comments/download?branchRef=" String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Text -> String
Text.unpack (RepoInfo -> Text
forall a. ToHttpApiData a => a -> Text
toQueryParam RepoInfo
repoInfo)
      -- Enable compression
      let tokenProvider :: TokenProvider
tokenProvider = CredentialManager -> TokenProvider
newTokenProvider CredentialManager
credentialManager
      Either
  ConnectionException
  ((),
   [MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk])
result <- IO
  (Either
     ConnectionException
     ((),
      [MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk]))
-> Cli
     (Either
        ConnectionException
        ((),
         [MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk]))
forall a. IO a -> Cli a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO
   (Either
      ConnectionException
      ((),
       [MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk]))
 -> Cli
      (Either
         ConnectionException
         ((),
          [MsgOrError
             DownloadCommentsResponse HistoryCommentUploaderChunk])))
-> IO
     (Either
        ConnectionException
        ((),
         [MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk]))
-> Cli
     (Either
        ConnectionException
        ((),
         [MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk]))
forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) i o r e.
(MonadUnliftIO m, WebSocketsData i, WebSocketsData o) =>
Int
-> CodeserverURI
-> (CodeserverId -> IO (Either e Text))
-> String
-> (Queues i o -> m r)
-> m (Either ConnectionException (r, [o]))
withCodeserverWebsocket @IO @(MsgOrError Void HistoryCommentDownloaderChunk) @(MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk) Int
msgBufferSize CodeserverURI
codeserver TokenProvider
tokenProvider String
path \Queues {MsgOrError Void HistoryCommentDownloaderChunk -> STM Bool
$sel:send:Queues :: forall i o. Queues i o -> i -> STM Bool
send :: MsgOrError Void HistoryCommentDownloaderChunk -> STM Bool
send, STM
  (Maybe
     (MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk))
$sel:receive:Queues :: forall i o. Queues i o -> STM (Maybe o)
receive :: STM
  (Maybe
     (MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk))
receive} -> (Scope -> IO ()) -> IO ()
forall a. (Scope -> IO a) -> IO a
Ki.scoped \Scope
scope -> do
        TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
hashesToCheckQ <- IO
  (TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32]))
-> IO
     (TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32]))
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO
   (TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32]))
 -> IO
      (TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])))
-> IO
     (TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32]))
-> IO
     (TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32]))
forall a b. (a -> b) -> a -> b
$ forall a. Int -> IO (TBMQueue a)
newTBMQueueIO @(HistoryCommentHash32, [HistoryCommentRevisionHash32]) Int
100
        TBMQueue (Either HistoryComment HistoryCommentRevision)
commentsQ <- IO (TBMQueue (Either HistoryComment HistoryCommentRevision))
-> IO (TBMQueue (Either HistoryComment HistoryCommentRevision))
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TBMQueue (Either HistoryComment HistoryCommentRevision))
 -> IO (TBMQueue (Either HistoryComment HistoryCommentRevision)))
-> IO (TBMQueue (Either HistoryComment HistoryCommentRevision))
-> IO (TBMQueue (Either HistoryComment HistoryCommentRevision))
forall a b. (a -> b) -> a -> b
$ Int -> IO (TBMQueue (Either HistoryComment HistoryCommentRevision))
forall a. Int -> IO (TBMQueue a)
newTBMQueueIO Int
100
        TMVar Text
errMVar <- IO (TMVar Text) -> IO (TMVar Text)
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TMVar Text)
forall (m :: * -> *) a. MonadIO m => m (TMVar a)
newEmptyTMVarIO
        Thread ()
_receiverThread <- IO (Thread ()) -> IO (Thread ())
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Thread ()) -> IO (Thread ()))
-> IO (Thread ()) -> IO (Thread ())
forall a b. (a -> b) -> a -> b
$ Scope -> IO () -> IO (Thread ())
forall a. Scope -> IO a -> IO (Thread a)
Ki.fork Scope
scope (IO () -> IO (Thread ())) -> IO () -> IO (Thread ())
forall a b. (a -> b) -> a -> b
$ STM
  (Maybe
     (MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk))
-> TMVar Text
-> TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> TBMQueue (Either HistoryComment HistoryCommentRevision)
-> IO ()
receiverWorker STM
  (Maybe
     (MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk))
receive TMVar Text
errMVar TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
hashesToCheckQ TBMQueue (Either HistoryComment HistoryCommentRevision)
commentsQ
        Thread ()
inserterThread <- IO (Thread ()) -> IO (Thread ())
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Thread ()) -> IO (Thread ()))
-> IO (Thread ()) -> IO (Thread ())
forall a b. (a -> b) -> a -> b
$ Scope -> IO () -> IO (Thread ())
forall a. Scope -> IO a -> IO (Thread a)
Ki.fork Scope
scope (IO () -> IO (Thread ())) -> IO () -> IO (Thread ())
forall a b. (a -> b) -> a -> b
$ Codebase IO Symbol Ann
-> TBMQueue (Either HistoryComment HistoryCommentRevision) -> IO ()
forall v a.
Codebase IO v a
-> TBMQueue (Either HistoryComment HistoryCommentRevision) -> IO ()
inserterWorker Codebase IO Symbol Ann
codebase TBMQueue (Either HistoryComment HistoryCommentRevision)
commentsQ
        Thread ()
_hashCheckingThread <- IO (Thread ()) -> IO (Thread ())
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Thread ()) -> IO (Thread ()))
-> IO (Thread ()) -> IO (Thread ())
forall a b. (a -> b) -> a -> b
$ Scope -> IO () -> IO (Thread ())
forall a. Scope -> IO a -> IO (Thread a)
Ki.fork Scope
scope (IO () -> IO (Thread ())) -> IO () -> IO (Thread ())
forall a b. (a -> b) -> a -> b
$ Codebase IO Symbol Ann
-> (MsgOrError Void HistoryCommentDownloaderChunk -> STM Bool)
-> TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> IO ()
forall v a err.
Codebase IO v a
-> (MsgOrError err HistoryCommentDownloaderChunk -> STM Bool)
-> TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> IO ()
hashCheckingWorker Codebase IO Symbol Ann
codebase MsgOrError Void HistoryCommentDownloaderChunk -> STM Bool
send TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
hashesToCheckQ
        DebugFlag -> String -> IO ()
forall (m :: * -> *). Monad m => DebugFlag -> String -> m ()
Debug.debugLogM DebugFlag
Debug.HistoryComments String
"Downloading history comments: waiting for inserter thread to finish"
        -- The inserter thread will finish when the client closes the connection.
        STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ Thread () -> STM ()
forall a. Thread a -> STM a
Ki.await Thread ()
inserterThread
      case Either
  ConnectionException
  ((),
   [MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk])
result of
        Left ConnectionException
connException -> String -> Cli ()
forall a. HasCallStack => String -> a
error (String -> Cli ()) -> String -> Cli ()
forall a b. (a -> b) -> a -> b
$ String
"downloadHistoryComments: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> ConnectionException -> String
forall a. Show a => a -> String
show ConnectionException
connException
        Right ((), [MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk]
_leftovers) -> () -> Cli ()
forall a. a -> Cli a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
  where
    inserterWorker ::
      Codebase.Codebase IO v a ->
      TBMQueue (Either HistoryComment HistoryCommentRevision) ->
      IO ()
    inserterWorker :: forall v a.
Codebase IO v a
-> TBMQueue (Either HistoryComment HistoryCommentRevision) -> IO ()
inserterWorker Codebase IO v a
codebase TBMQueue (Either HistoryComment HistoryCommentRevision)
commentsQ = do
      let loop :: IO ()
loop = do
            ([Either HistoryComment HistoryCommentRevision]
chunk, Bool
closed) <- STM ([Either HistoryComment HistoryCommentRevision], Bool)
-> IO ([Either HistoryComment HistoryCommentRevision], Bool)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (Int
-> STM (Maybe (Either HistoryComment HistoryCommentRevision))
-> STM ([Either HistoryComment HistoryCommentRevision], Bool)
forall a. Show a => Int -> STM (Maybe a) -> STM ([a], Bool)
fetchChunk Int
insertCommentBatchSize (TBMQueue (Either HistoryComment HistoryCommentRevision)
-> STM (Maybe (Either HistoryComment HistoryCommentRevision))
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue (Either HistoryComment HistoryCommentRevision)
commentsQ))
            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not ([Either HistoryComment HistoryCommentRevision] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Either HistoryComment HistoryCommentRevision]
chunk)) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
              DebugFlag -> String -> Int -> IO ()
forall a (m :: * -> *).
(Show a, Monad m) =>
DebugFlag -> String -> a -> m ()
Debug.debugM DebugFlag
Debug.HistoryComments String
"Inserting comments chunk of size" ([Either HistoryComment HistoryCommentRevision] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Either HistoryComment HistoryCommentRevision]
chunk)
              Codebase IO v a -> Transaction () -> IO ()
forall (m :: * -> *) v a b.
MonadIO m =>
Codebase m v a -> Transaction b -> m b
Codebase.runTransaction Codebase IO v a
codebase (Transaction () -> IO ()) -> Transaction () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                [Either HistoryComment HistoryCommentRevision]
-> (Either HistoryComment HistoryCommentRevision -> Transaction ())
-> Transaction ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [Either HistoryComment HistoryCommentRevision]
chunk \case
                  Left
                    HistoryComment
                      { Text
$sel:author:HistoryComment :: HistoryComment -> Text
author :: Text
author,
                        UTCTime
$sel:createdAt:HistoryComment :: HistoryComment -> UTCTime
createdAt :: UTCTime
createdAt,
                        Text
$sel:authorThumbprint:HistoryComment :: HistoryComment -> Text
authorThumbprint :: Text
authorThumbprint,
                        Hash32
$sel:causalHash:HistoryComment :: HistoryComment -> Hash32
causalHash :: Hash32
causalHash,
                        Hash32
$sel:commentHash:HistoryComment :: HistoryComment -> Hash32
commentHash :: Hash32
commentHash
                      } -> do
                      CausalHashId
causalHashId <- Hash32 -> Transaction CausalHashId
Q.expectCausalHashIdForHash32 Hash32
causalHash
                      HistoryComment
  UTCTime KeyThumbprint CausalHashId HistoryCommentHash
-> Transaction ()
Q.insertHistoryComment
                        HC.HistoryComment
                          { Text
$sel:author:HistoryComment :: Text
author :: Text
author,
                            UTCTime
$sel:createdAt:HistoryComment :: UTCTime
createdAt :: UTCTime
createdAt,
                            $sel:authorThumbprint:HistoryComment :: KeyThumbprint
authorThumbprint = Text -> KeyThumbprint
KeyThumbprint Text
authorThumbprint,
                            $sel:causal:HistoryComment :: CausalHashId
causal = CausalHashId
causalHashId,
                            $sel:commentId:HistoryComment :: HistoryCommentHash
commentId = Hash -> HistoryCommentHash
HistoryCommentHash (Hash -> HistoryCommentHash) -> Hash -> HistoryCommentHash
forall a b. (a -> b) -> a -> b
$ forall target source. From source target => source -> target
into @Hash Hash32
commentHash
                          }
                  Right
                    HistoryCommentRevision
                      { Text
$sel:subject:HistoryCommentRevision :: HistoryCommentRevision -> Text
subject :: Text
subject,
                        Text
$sel:content:HistoryCommentRevision :: HistoryCommentRevision -> Text
content :: Text
content,
                        UTCTime
$sel:createdAt:HistoryCommentRevision :: HistoryCommentRevision -> UTCTime
createdAt :: UTCTime
createdAt,
                        Bool
$sel:isHidden:HistoryCommentRevision :: HistoryCommentRevision -> Bool
isHidden :: Bool
isHidden,
                        ByteString
$sel:authorSignature:HistoryCommentRevision :: HistoryCommentRevision -> ByteString
authorSignature :: ByteString
authorSignature,
                        Hash32
$sel:revisionHash:HistoryCommentRevision :: HistoryCommentRevision -> Hash32
revisionHash :: Hash32
revisionHash,
                        Hash32
$sel:commentHash:HistoryCommentRevision :: HistoryCommentRevision -> Hash32
commentHash :: Hash32
commentHash
                      } -> do
                      HistoryCommentRevision
  HistoryCommentRevisionHash UTCTime HistoryCommentHash
-> Transaction ()
Q.insertHistoryCommentRevision
                        HC.HistoryCommentRevision
                          { Text
$sel:subject:HistoryCommentRevision :: Text
subject :: Text
subject,
                            Text
$sel:content:HistoryCommentRevision :: Text
content :: Text
content,
                            UTCTime
$sel:createdAt:HistoryCommentRevision :: UTCTime
createdAt :: UTCTime
createdAt,
                            $sel:comment:HistoryCommentRevision :: HistoryCommentHash
comment = Hash -> HistoryCommentHash
HistoryCommentHash (Hash -> HistoryCommentHash) -> Hash -> HistoryCommentHash
forall a b. (a -> b) -> a -> b
$ forall target source. From source target => source -> target
into @Hash Hash32
commentHash,
                            $sel:isHidden:HistoryCommentRevision :: Bool
isHidden = Bool
isHidden,
                            $sel:authorSignature:HistoryCommentRevision :: ByteString
authorSignature = ByteString
authorSignature,
                            $sel:revisionId:HistoryCommentRevision :: HistoryCommentRevisionHash
revisionId = Hash -> HistoryCommentRevisionHash
HistoryCommentRevisionHash (Hash -> HistoryCommentRevisionHash)
-> Hash -> HistoryCommentRevisionHash
forall a b. (a -> b) -> a -> b
$ forall target source. From source target => source -> target
into @Hash Hash32
revisionHash
                          }
            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
closed) IO ()
loop
      IO ()
loop
      DebugFlag -> String -> IO ()
forall (m :: * -> *). Monad m => DebugFlag -> String -> m ()
Debug.debugLogM DebugFlag
Debug.HistoryComments String
"Inserter worker finished"

    hashCheckingWorker ::
      Codebase.Codebase IO v a ->
      (MsgOrError err HistoryCommentDownloaderChunk -> STM Bool) ->
      TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32]) ->
      IO ()
    hashCheckingWorker :: forall v a err.
Codebase IO v a
-> (MsgOrError err HistoryCommentDownloaderChunk -> STM Bool)
-> TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> IO ()
hashCheckingWorker Codebase IO v a
codebase MsgOrError err HistoryCommentDownloaderChunk -> STM Bool
send TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
hashesToCheckQ = do
      let loop :: IO ()
loop = do
            ([(HistoryCommentHash32, [HistoryCommentRevisionHash32])]
hashes, Bool
closed) <- STM
  ([(HistoryCommentHash32, [HistoryCommentRevisionHash32])], Bool)
-> IO
     ([(HistoryCommentHash32, [HistoryCommentRevisionHash32])], Bool)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (Int
-> STM
     (Maybe (HistoryCommentHash32, [HistoryCommentRevisionHash32]))
-> STM
     ([(HistoryCommentHash32, [HistoryCommentRevisionHash32])], Bool)
forall a. Show a => Int -> STM (Maybe a) -> STM ([a], Bool)
fetchChunk Int
insertCommentBatchSize (TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> STM
     (Maybe (HistoryCommentHash32, [HistoryCommentRevisionHash32]))
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
hashesToCheckQ))
            DebugFlag -> String -> Int -> IO ()
forall a (m :: * -> *).
(Show a, Monad m) =>
DebugFlag -> String -> a -> m ()
Debug.debugM DebugFlag
Debug.HistoryComments String
"Checking hashes chunk of size" ([(HistoryCommentHash32, [HistoryCommentRevisionHash32])] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(HistoryCommentHash32, [HistoryCommentRevisionHash32])]
hashes)
            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not ([(HistoryCommentHash32, [HistoryCommentRevisionHash32])] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(HistoryCommentHash32, [HistoryCommentRevisionHash32])]
hashes)) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
              [Either HistoryCommentHash32 HistoryCommentRevisionHash32]
unknownHashes <- do
                Codebase IO v a
-> Transaction
     [Either HistoryCommentHash32 HistoryCommentRevisionHash32]
-> IO [Either HistoryCommentHash32 HistoryCommentRevisionHash32]
forall (m :: * -> *) v a b.
MonadIO m =>
Codebase m v a -> Transaction b -> m b
Codebase.runTransaction Codebase IO v a
codebase (Transaction
   [Either HistoryCommentHash32 HistoryCommentRevisionHash32]
 -> IO [Either HistoryCommentHash32 HistoryCommentRevisionHash32])
-> Transaction
     [Either HistoryCommentHash32 HistoryCommentRevisionHash32]
-> IO [Either HistoryCommentHash32 HistoryCommentRevisionHash32]
forall a b. (a -> b) -> a -> b
$ do
                  [(HistoryCommentHash32, [HistoryCommentRevisionHash32])]
hashes [(HistoryCommentHash32, [HistoryCommentRevisionHash32])]
-> ([(HistoryCommentHash32, [HistoryCommentRevisionHash32])]
    -> Transaction
         [Either HistoryCommentHash32 HistoryCommentRevisionHash32])
-> Transaction
     [Either HistoryCommentHash32 HistoryCommentRevisionHash32]
forall a b. a -> (a -> b) -> b
& ((HistoryCommentHash32, [HistoryCommentRevisionHash32])
 -> Transaction
      [Either HistoryCommentHash32 HistoryCommentRevisionHash32])
-> [(HistoryCommentHash32, [HistoryCommentRevisionHash32])]
-> Transaction
     [Either HistoryCommentHash32 HistoryCommentRevisionHash32]
forall (m :: * -> *) (f :: * -> *) b a.
(Monad m, Foldable f, Monoid b) =>
(a -> m b) -> f a -> m b
foldMapM \(HistoryCommentHash32 Hash32
commentHash, [HistoryCommentRevisionHash32]
revisionHashes) -> do
                    Bool
haveComment <- Hash32 -> Transaction Bool
Q.haveHistoryComment Hash32
commentHash
                    if Bool
haveComment
                      then do
                        [HistoryCommentRevisionHash32]
revisionHashes [HistoryCommentRevisionHash32]
-> ([HistoryCommentRevisionHash32]
    -> Transaction
         [Either HistoryCommentHash32 HistoryCommentRevisionHash32])
-> Transaction
     [Either HistoryCommentHash32 HistoryCommentRevisionHash32]
forall a b. a -> (a -> b) -> b
& (HistoryCommentRevisionHash32
 -> Transaction
      (Maybe (Either HistoryCommentHash32 HistoryCommentRevisionHash32)))
-> [HistoryCommentRevisionHash32]
-> Transaction
     [Either HistoryCommentHash32 HistoryCommentRevisionHash32]
forall (t :: * -> *) (f :: * -> *) a b.
(Witherable t, Applicative f) =>
(a -> f (Maybe b)) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f (Maybe b)) -> [a] -> f [b]
wither \(HistoryCommentRevisionHash32 Hash32
revisionHash) -> do
                          Hash32 -> Transaction Bool
Q.haveHistoryCommentRevision Hash32
revisionHash
                            Transaction Bool
-> (Bool
    -> Maybe
         (Either HistoryCommentHash32 HistoryCommentRevisionHash32))
-> Transaction
     (Maybe (Either HistoryCommentHash32 HistoryCommentRevisionHash32))
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
                              Bool
True -> Maybe (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
forall a. Maybe a
Nothing
                              Bool
False -> Either HistoryCommentHash32 HistoryCommentRevisionHash32
-> Maybe (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
forall a. a -> Maybe a
Just (Either HistoryCommentHash32 HistoryCommentRevisionHash32
 -> Maybe
      (Either HistoryCommentHash32 HistoryCommentRevisionHash32))
-> Either HistoryCommentHash32 HistoryCommentRevisionHash32
-> Maybe (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
forall a b. (a -> b) -> a -> b
$ HistoryCommentRevisionHash32
-> Either HistoryCommentHash32 HistoryCommentRevisionHash32
forall a b. b -> Either a b
Right (HistoryCommentRevisionHash32
 -> Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> HistoryCommentRevisionHash32
-> Either HistoryCommentHash32 HistoryCommentRevisionHash32
forall a b. (a -> b) -> a -> b
$ Hash32 -> HistoryCommentRevisionHash32
HistoryCommentRevisionHash32 (Hash32 -> HistoryCommentRevisionHash32)
-> Hash32 -> HistoryCommentRevisionHash32
forall a b. (a -> b) -> a -> b
$ Hash32
revisionHash
                      else do
                        [Either HistoryCommentHash32 HistoryCommentRevisionHash32]
-> Transaction
     [Either HistoryCommentHash32 HistoryCommentRevisionHash32]
forall a. a -> Transaction a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either HistoryCommentHash32 HistoryCommentRevisionHash32
-> [Either HistoryCommentHash32 HistoryCommentRevisionHash32]
forall a. a -> [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure (HistoryCommentHash32
-> Either HistoryCommentHash32 HistoryCommentRevisionHash32
forall a b. a -> Either a b
Left (HistoryCommentHash32
 -> Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> HistoryCommentHash32
-> Either HistoryCommentHash32 HistoryCommentRevisionHash32
forall a b. (a -> b) -> a -> b
$ Hash32 -> HistoryCommentHash32
HistoryCommentHash32 Hash32
commentHash) [Either HistoryCommentHash32 HistoryCommentRevisionHash32]
-> [Either HistoryCommentHash32 HistoryCommentRevisionHash32]
-> [Either HistoryCommentHash32 HistoryCommentRevisionHash32]
forall a. Semigroup a => a -> a -> a
<> (HistoryCommentRevisionHash32
-> Either HistoryCommentHash32 HistoryCommentRevisionHash32
forall a b. b -> Either a b
Right (HistoryCommentRevisionHash32
 -> Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> [HistoryCommentRevisionHash32]
-> [Either HistoryCommentHash32 HistoryCommentRevisionHash32]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [HistoryCommentRevisionHash32]
revisionHashes))
              case Set (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> Maybe
     (NESet (Either HistoryCommentHash32 HistoryCommentRevisionHash32))
forall a. Set a -> Maybe (NESet a)
NESet.nonEmptySet ([Either HistoryCommentHash32 HistoryCommentRevisionHash32]
-> Set (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
forall a. Ord a => [a] -> Set a
Set.fromList [Either HistoryCommentHash32 HistoryCommentRevisionHash32]
unknownHashes) of
                Maybe
  (NESet (Either HistoryCommentHash32 HistoryCommentRevisionHash32))
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                Just NESet (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
unknownHashesSet -> do
                  IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> (STM Bool -> IO Bool) -> STM Bool -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM Bool -> IO Bool
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Bool -> IO ()) -> STM Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MsgOrError err HistoryCommentDownloaderChunk -> STM Bool
send (MsgOrError err HistoryCommentDownloaderChunk -> STM Bool)
-> MsgOrError err HistoryCommentDownloaderChunk -> STM Bool
forall a b. (a -> b) -> a -> b
$ HistoryCommentDownloaderChunk
-> MsgOrError err HistoryCommentDownloaderChunk
forall err a. a -> MsgOrError err a
Msg (HistoryCommentDownloaderChunk
 -> MsgOrError err HistoryCommentDownloaderChunk)
-> HistoryCommentDownloaderChunk
-> MsgOrError err HistoryCommentDownloaderChunk
forall a b. (a -> b) -> a -> b
$ NESet (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
-> HistoryCommentDownloaderChunk
RequestCommentsChunk NESet (Either HistoryCommentHash32 HistoryCommentRevisionHash32)
unknownHashesSet
            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
closed) IO ()
loop
      IO ()
loop
      IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> (STM Bool -> IO Bool) -> STM Bool -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM Bool -> IO Bool
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Bool -> IO ()) -> STM Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MsgOrError err HistoryCommentDownloaderChunk -> STM Bool
send (MsgOrError err HistoryCommentDownloaderChunk -> STM Bool)
-> MsgOrError err HistoryCommentDownloaderChunk -> STM Bool
forall a b. (a -> b) -> a -> b
$ HistoryCommentDownloaderChunk
-> MsgOrError err HistoryCommentDownloaderChunk
forall err a. a -> MsgOrError err a
Msg (HistoryCommentDownloaderChunk
 -> MsgOrError err HistoryCommentDownloaderChunk)
-> HistoryCommentDownloaderChunk
-> MsgOrError err HistoryCommentDownloaderChunk
forall a b. (a -> b) -> a -> b
$ HistoryCommentDownloaderChunk
DoneCheckingHashesChunk
      DebugFlag -> String -> IO ()
forall (m :: * -> *). Monad m => DebugFlag -> String -> m ()
Debug.debugLogM DebugFlag
Debug.HistoryComments String
"Hash checking worker finished"
    receiverWorker ::
      STM (Maybe (MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk)) ->
      TMVar Text ->
      TBMQueue
        ( HistoryCommentHash32,
          [HistoryCommentRevisionHash32]
        ) ->
      TBMQueue (Either HistoryComment HistoryCommentRevision) ->
      IO ()
    receiverWorker :: STM
  (Maybe
     (MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk))
-> TMVar Text
-> TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> TBMQueue (Either HistoryComment HistoryCommentRevision)
-> IO ()
receiverWorker STM
  (Maybe
     (MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk))
recv TMVar Text
errMVar TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
hashesToCheckQ TBMQueue (Either HistoryComment HistoryCommentRevision)
commentsQ = do
      let loop :: IO ()
loop = do
            IO ()
next <- STM (IO ()) -> IO (IO ())
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically do
              STM
  (Maybe
     (MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk))
recv STM
  (Maybe
     (MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk))
-> (Maybe
      (MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk)
    -> STM (IO ()))
-> STM (IO ())
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                Maybe
  (MsgOrError DownloadCommentsResponse HistoryCommentUploaderChunk)
Nothing -> do
                  TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
hashesToCheckQ
                  TBMQueue (Either HistoryComment HistoryCommentRevision) -> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue (Either HistoryComment HistoryCommentRevision)
commentsQ
                  IO () -> STM (IO ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
                Just (DeserialiseFailure Text
err) -> do
                  TMVar Text -> Text -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar Text
errMVar (Text -> STM ()) -> Text -> STM ()
forall a b. (a -> b) -> a -> b
$ Text
"downloadHistoryComments: deserialisation failure: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
err
                  IO () -> STM (IO ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
                Just (UserErr DownloadCommentsResponse
err) -> do
                  TMVar Text -> Text -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar Text
errMVar (Text -> STM ()) -> Text -> STM ()
forall a b. (a -> b) -> a -> b
$ Text
"downloadHistoryComments: server error: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> DownloadCommentsResponse -> Text
forall a. Show a => a -> Text
tShow DownloadCommentsResponse
err
                  IO () -> STM (IO ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
                Just (Msg HistoryCommentUploaderChunk
msg) -> do
                  case HistoryCommentUploaderChunk
msg of
                    PossiblyNewHashesChunk NonEmpty (HistoryCommentHash32, [HistoryCommentRevisionHash32])
hashesToCheck -> do
                      NonEmpty (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> ((HistoryCommentHash32, [HistoryCommentRevisionHash32])
    -> STM ())
-> STM ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ NonEmpty (HistoryCommentHash32, [HistoryCommentRevisionHash32])
hashesToCheck (((HistoryCommentHash32, [HistoryCommentRevisionHash32]) -> STM ())
 -> STM ())
-> ((HistoryCommentHash32, [HistoryCommentRevisionHash32])
    -> STM ())
-> STM ()
forall a b. (a -> b) -> a -> b
$ \(HistoryCommentHash32, [HistoryCommentRevisionHash32])
h -> TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> (HistoryCommentHash32, [HistoryCommentRevisionHash32]) -> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
hashesToCheckQ (HistoryCommentHash32, [HistoryCommentRevisionHash32])
h
                    HistoryCommentUploaderChunk
DoneSendingHashesChunk -> do
                      TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
-> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue (HistoryCommentHash32, [HistoryCommentRevisionHash32])
hashesToCheckQ
                    HistoryCommentChunk HistoryComment
comment -> do
                      TBMQueue (Either HistoryComment HistoryCommentRevision)
-> Either HistoryComment HistoryCommentRevision -> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue (Either HistoryComment HistoryCommentRevision)
commentsQ (HistoryComment -> Either HistoryComment HistoryCommentRevision
forall a b. a -> Either a b
Left HistoryComment
comment)
                    HistoryCommentRevisionChunk HistoryCommentRevision
revision -> do
                      TBMQueue (Either HistoryComment HistoryCommentRevision)
-> Either HistoryComment HistoryCommentRevision -> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue (Either HistoryComment HistoryCommentRevision)
commentsQ (HistoryCommentRevision
-> Either HistoryComment HistoryCommentRevision
forall a b. b -> Either a b
Right HistoryCommentRevision
revision)
                  IO () -> STM (IO ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure IO ()
loop
            IO ()
next
      IO ()
loop
      DebugFlag -> String -> IO ()
forall (m :: * -> *). Monad m => DebugFlag -> String -> m ()
Debug.debugLogM DebugFlag
Debug.HistoryComments String
"Receiver worker finished"
    insertCommentBatchSize :: Int
insertCommentBatchSize = Int
100