From 9c93ad25af3e43ad5598a0f0d723b068c47d31a1 Mon Sep 17 00:00:00 2001 From: ndwarshuis Date: Sun, 9 Jul 2023 00:16:57 -0400 Subject: [PATCH] ENH break up input files to thread them --- app/Main.hs | 84 ++++++++++++++++++++++++++++++++----- lib/Internal/Database.hs | 32 +++++++------- lib/Internal/Types/Dhall.hs | 4 +- lib/Internal/Utils.hs | 9 ++++ 4 files changed, 99 insertions(+), 30 deletions(-) diff --git a/app/Main.hs b/app/Main.hs index 60c403e..87eb835 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -2,13 +2,15 @@ module Main (main) where +import Control.Concurrent import Control.Monad.Except import Control.Monad.IO.Rerunnable import Control.Monad.Logger import Control.Monad.Reader +import Data.Bitraversable import qualified Data.Text.IO as TI import Database.Persist.Monad -import Dhall hiding (double, record) +import qualified Dhall hiding (double, record) import Internal.Budget import Internal.Database import Internal.History @@ -30,14 +32,26 @@ main = parse =<< execParser o <> header "pwncash - your budget, your life" ) -data Options = Options FilePath Mode +type ConfigPath = FilePath + +type BudgetPath = FilePath + +type HistoryPath = FilePath + +data Options = Options !ConfigPath !Mode data Mode = Reset | DumpCurrencies | DumpAccounts | DumpAccountKeys - | Sync + | Sync !SyncOptions + +data SyncOptions = SyncOptions + { syncBudgets :: ![BudgetPath] + , syncHistories :: ![HistoryPath] + , syncThreads :: !Int + } configFile :: Parser FilePath configFile = @@ -104,6 +118,35 @@ sync = <> short 'S' <> help "Sync config to database" ) + <*> syncOptions + +syncOptions :: Parser SyncOptions +syncOptions = + SyncOptions + <$> many + ( strOption + ( long "budget" + <> short 'b' + <> metavar "BUDGET" + <> help "path to a budget config" + ) + ) + <*> many + ( strOption + ( long "history" + <> short 'H' + <> metavar "HISTORY" + <> help "path to a history config" + ) + ) + <*> option + auto + ( long "threads" + <> short 't' + <> metavar "THREADS" + <> value 1 + <> help "number of threads for syncing" + ) parse :: Options -> IO () parse (Options c Reset) = do @@ -112,7 +155,8 @@ parse (Options c Reset) = do parse (Options c DumpAccounts) = runDumpAccounts c parse (Options c DumpAccountKeys) = runDumpAccountKeys c parse (Options c DumpCurrencies) = runDumpCurrencies c -parse (Options c Sync) = runSync c +parse (Options c (Sync SyncOptions {syncBudgets, syncHistories, syncThreads})) = + runSync syncThreads c syncBudgets syncHistories runDumpCurrencies :: MonadUnliftIO m => FilePath -> m () runDumpCurrencies c = do @@ -160,29 +204,42 @@ runDumpAccountKeys c = do t3 (_, _, x) = x double x = (x, x) -runSync :: FilePath -> IO () -runSync c = do +runSync :: Int -> FilePath -> [FilePath] -> [FilePath] -> IO () +runSync threads c bs hs = do + setNumCapabilities threads + -- putStrLn "reading config" config <- readConfig c + -- putStrLn "reading statements" + (bs', hs') <- + fmap (bimap concat concat . partitionEithers) $ + pooledMapConcurrentlyN threads (bimapM readDhall readDhall) $ + (Left <$> bs) ++ (Right <$> hs) pool <- runNoLoggingT $ mkPool $ sqlConfig config + putStrLn "doing other stuff" + setNumCapabilities 1 handle err $ do -- _ <- askLoggerIO -- Get the current DB state. (state, updates) <- runSqlQueryT pool $ do runMigration migrateAll - liftIOExceptT $ getDBState config + liftIOExceptT $ getDBState config bs' hs' -- Read raw transactions according to state. If a transaction is already in -- the database, don't read it but record the commit so we can update it. (rus, is) <- flip runReaderT state $ do - let (hTs, hSs) = splitHistory $ statements config + let (hTs, hSs) = splitHistory hs' + -- TODO for some mysterious reason using multithreading just for this + -- little bit slows the program down by several seconds + -- lift $ setNumCapabilities threads hSs' <- mapErrorsIO (readHistStmt root) hSs + -- lift $ setNumCapabilities 1 -- lift $ print $ length $ lefts hSs' -- lift $ print $ length $ rights hSs' hTs' <- liftIOExceptT $ mapErrors readHistTransfer hTs -- lift $ print $ length $ lefts hTs' - bTs <- liftIOExceptT $ mapErrors readBudget $ budget config + bTs <- liftIOExceptT $ mapErrors readBudget bs' -- lift $ print $ length $ lefts bTs return $ second concat $ partitionEithers $ hSs' ++ hTs' ++ bTs -- print $ length $ kmNewCommits state @@ -218,4 +275,11 @@ runSync c = do -- showBalances readConfig :: MonadUnliftIO m => FilePath -> m Config -readConfig confpath = liftIO $ unfix <$> Dhall.inputFile Dhall.auto confpath +readConfig = fmap unfix . readDhall + +readDhall :: Dhall.FromDhall a => MonadUnliftIO m => FilePath -> m a +readDhall confpath = do + -- tid <- myThreadId + -- liftIO $ print $ show tid + -- liftIO $ print confpath + liftIO $ Dhall.inputFile Dhall.auto confpath diff --git a/lib/Internal/Database.hs b/lib/Internal/Database.hs index 0209355..2ede03c 100644 --- a/lib/Internal/Database.hs +++ b/lib/Internal/Database.hs @@ -109,16 +109,12 @@ nukeTables = do -- toFullPath path name = T.unwords [unValue @T.Text path, "/", unValue @T.Text name] -- toBal = maybe "???" (fmtRational 2) . unValue -hashConfig :: Config -> [Int] -hashConfig - Config_ - { budget = bs - , statements = ss - } = (hash <$> bs) ++ (hash <$> ms) ++ (hash <$> ps) - where - (ms, ps) = partitionEithers $ fmap go ss - go (HistTransfer x) = Left x - go (HistStatement x) = Right x +hashConfig :: [Budget] -> [History] -> [Int] +hashConfig bs hs = (hash <$> bs) ++ (hash <$> ms) ++ (hash <$> ps) + where + (ms, ps) = partitionEithers $ fmap go hs + go (HistTransfer x) = Left x + go (HistStatement x) = Right x setDiff :: Eq a => [a] -> [a] -> ([a], [a]) -- setDiff = setDiff' (==) @@ -148,9 +144,9 @@ nukeDBHash h = deleteE $ do nukeDBHashes :: MonadSqlQuery m => [Int] -> m () nukeDBHashes = mapM_ nukeDBHash -getConfigHashes :: MonadSqlQuery m => Config -> m ([Int], [Int]) -getConfigHashes c = do - let ch = hashConfig c +getConfigHashes :: MonadSqlQuery m => [Budget] -> [History] -> m ([Int], [Int]) +getConfigHashes bs hs = do + let ch = hashConfig bs hs dh <- getDBHashes return $ setDiff dh ch @@ -306,9 +302,11 @@ indexAcntRoot r = getDBState :: (MonadInsertError m, MonadSqlQuery m) => Config + -> [Budget] + -> [History] -> m (DBState, DBUpdates) -getDBState c = do - (del, new) <- getConfigHashes c +getDBState c bs hs = do + (del, new) <- getConfigHashes bs hs combineError bi si $ \b s -> ( DBState { kmCurrency = currencyMap cs @@ -327,8 +325,8 @@ getDBState c = do } ) where - bi = liftExcept $ resolveDaySpan $ budgetInterval $ global c - si = liftExcept $ resolveDaySpan $ statementInterval $ global c + bi = liftExcept $ resolveDaySpan $ budgetInterval $ scope c + si = liftExcept $ resolveDaySpan $ statementInterval $ scope c (acnts, paths, am) = indexAcntRoot $ accounts c cs = currency2Record <$> currencies c ts = toRecord <$> tags c diff --git a/lib/Internal/Types/Dhall.hs b/lib/Internal/Types/Dhall.hs index 6a060a2..5cb6af0 100644 --- a/lib/Internal/Types/Dhall.hs +++ b/lib/Internal/Types/Dhall.hs @@ -380,10 +380,8 @@ deriving instance FromDhall AccountRootF type AccountRoot = AccountRoot_ AccountTree data Config_ a = Config_ - { global :: !TemporalScope - , budget :: ![Budget] + { scope :: !TemporalScope , currencies :: ![Currency] - , statements :: ![History] , accounts :: !a , tags :: ![Tag] , sqlConfig :: !SqlConfig diff --git a/lib/Internal/Utils.hs b/lib/Internal/Utils.hs index bf6168f..32cd18f 100644 --- a/lib/Internal/Utils.hs +++ b/lib/Internal/Utils.hs @@ -27,6 +27,7 @@ module Internal.Utils , combineErrorIOM3 , collectErrorsIO , mapErrorsIO + , mapErrorsPooledIO , showError , acntPath2Text , showT @@ -387,6 +388,14 @@ combineErrorIOM3 :: MonadUnliftIO m => m a -> m b -> m c -> (a -> b -> c -> m d) combineErrorIOM3 a b c f = combineErrorIOM2 (combineErrorIOM2 a b (curry return)) c $ \(x, y) z -> f x y z +mapErrorsPooledIO :: (Traversable t, MonadUnliftIO m) => Int -> (a -> m b) -> t a -> m (t b) +mapErrorsPooledIO t f xs = pooledMapConcurrentlyN t go $ enumTraversable xs + where + go (n, x) = catch (f x) $ \(InsertException e) -> do + es <- fmap catMaybes $ mapM (err . f) $ drop (n + 1) $ toList xs + throwIO $ InsertException $ foldr (<>) e es + err x = catch (Nothing <$ x) $ \(InsertException es) -> pure $ Just es + mapErrorsIO :: (Traversable t, MonadUnliftIO m) => (a -> m b) -> t a -> m (t b) mapErrorsIO f xs = mapM go $ enumTraversable xs where