ENH break up input files to thread them

This commit is contained in:
Nathan Dwarshuis 2023-07-09 00:16:57 -04:00
parent bf1434542f
commit 9c93ad25af
4 changed files with 99 additions and 30 deletions

View File

@ -2,13 +2,15 @@
module Main (main) where
import Control.Concurrent
import Control.Monad.Except
import Control.Monad.IO.Rerunnable
import Control.Monad.Logger
import Control.Monad.Reader
import Data.Bitraversable
import qualified Data.Text.IO as TI
import Database.Persist.Monad
import Dhall hiding (double, record)
import qualified Dhall hiding (double, record)
import Internal.Budget
import Internal.Database
import Internal.History
@ -30,14 +32,26 @@ main = parse =<< execParser o
<> header "pwncash - your budget, your life"
)
data Options = Options FilePath Mode
type ConfigPath = FilePath
type BudgetPath = FilePath
type HistoryPath = FilePath
data Options = Options !ConfigPath !Mode
data Mode
= Reset
| DumpCurrencies
| DumpAccounts
| DumpAccountKeys
| Sync
| Sync !SyncOptions
data SyncOptions = SyncOptions
{ syncBudgets :: ![BudgetPath]
, syncHistories :: ![HistoryPath]
, syncThreads :: !Int
}
configFile :: Parser FilePath
configFile =
@ -104,6 +118,35 @@ sync =
<> short 'S'
<> help "Sync config to database"
)
<*> syncOptions
syncOptions :: Parser SyncOptions
syncOptions =
SyncOptions
<$> many
( strOption
( long "budget"
<> short 'b'
<> metavar "BUDGET"
<> help "path to a budget config"
)
)
<*> many
( strOption
( long "history"
<> short 'H'
<> metavar "HISTORY"
<> help "path to a history config"
)
)
<*> option
auto
( long "threads"
<> short 't'
<> metavar "THREADS"
<> value 1
<> help "number of threads for syncing"
)
parse :: Options -> IO ()
parse (Options c Reset) = do
@ -112,7 +155,8 @@ parse (Options c Reset) = do
parse (Options c DumpAccounts) = runDumpAccounts c
parse (Options c DumpAccountKeys) = runDumpAccountKeys c
parse (Options c DumpCurrencies) = runDumpCurrencies c
parse (Options c Sync) = runSync c
parse (Options c (Sync SyncOptions {syncBudgets, syncHistories, syncThreads})) =
runSync syncThreads c syncBudgets syncHistories
runDumpCurrencies :: MonadUnliftIO m => FilePath -> m ()
runDumpCurrencies c = do
@ -160,29 +204,42 @@ runDumpAccountKeys c = do
t3 (_, _, x) = x
double x = (x, x)
runSync :: FilePath -> IO ()
runSync c = do
runSync :: Int -> FilePath -> [FilePath] -> [FilePath] -> IO ()
runSync threads c bs hs = do
setNumCapabilities threads
-- putStrLn "reading config"
config <- readConfig c
-- putStrLn "reading statements"
(bs', hs') <-
fmap (bimap concat concat . partitionEithers) $
pooledMapConcurrentlyN threads (bimapM readDhall readDhall) $
(Left <$> bs) ++ (Right <$> hs)
pool <- runNoLoggingT $ mkPool $ sqlConfig config
putStrLn "doing other stuff"
setNumCapabilities 1
handle err $ do
-- _ <- askLoggerIO
-- Get the current DB state.
(state, updates) <- runSqlQueryT pool $ do
runMigration migrateAll
liftIOExceptT $ getDBState config
liftIOExceptT $ getDBState config bs' hs'
-- Read raw transactions according to state. If a transaction is already in
-- the database, don't read it but record the commit so we can update it.
(rus, is) <-
flip runReaderT state $ do
let (hTs, hSs) = splitHistory $ statements config
let (hTs, hSs) = splitHistory hs'
-- TODO for some mysterious reason using multithreading just for this
-- little bit slows the program down by several seconds
-- lift $ setNumCapabilities threads
hSs' <- mapErrorsIO (readHistStmt root) hSs
-- lift $ setNumCapabilities 1
-- lift $ print $ length $ lefts hSs'
-- lift $ print $ length $ rights hSs'
hTs' <- liftIOExceptT $ mapErrors readHistTransfer hTs
-- lift $ print $ length $ lefts hTs'
bTs <- liftIOExceptT $ mapErrors readBudget $ budget config
bTs <- liftIOExceptT $ mapErrors readBudget bs'
-- lift $ print $ length $ lefts bTs
return $ second concat $ partitionEithers $ hSs' ++ hTs' ++ bTs
-- print $ length $ kmNewCommits state
@ -218,4 +275,11 @@ runSync c = do
-- showBalances
readConfig :: MonadUnliftIO m => FilePath -> m Config
readConfig confpath = liftIO $ unfix <$> Dhall.inputFile Dhall.auto confpath
readConfig = fmap unfix . readDhall
readDhall :: Dhall.FromDhall a => MonadUnliftIO m => FilePath -> m a
readDhall confpath = do
-- tid <- myThreadId
-- liftIO $ print $ show tid
-- liftIO $ print confpath
liftIO $ Dhall.inputFile Dhall.auto confpath

View File

@ -109,16 +109,12 @@ nukeTables = do
-- toFullPath path name = T.unwords [unValue @T.Text path, "/", unValue @T.Text name]
-- toBal = maybe "???" (fmtRational 2) . unValue
hashConfig :: Config -> [Int]
hashConfig
Config_
{ budget = bs
, statements = ss
} = (hash <$> bs) ++ (hash <$> ms) ++ (hash <$> ps)
where
(ms, ps) = partitionEithers $ fmap go ss
go (HistTransfer x) = Left x
go (HistStatement x) = Right x
hashConfig :: [Budget] -> [History] -> [Int]
hashConfig bs hs = (hash <$> bs) ++ (hash <$> ms) ++ (hash <$> ps)
where
(ms, ps) = partitionEithers $ fmap go hs
go (HistTransfer x) = Left x
go (HistStatement x) = Right x
setDiff :: Eq a => [a] -> [a] -> ([a], [a])
-- setDiff = setDiff' (==)
@ -148,9 +144,9 @@ nukeDBHash h = deleteE $ do
nukeDBHashes :: MonadSqlQuery m => [Int] -> m ()
nukeDBHashes = mapM_ nukeDBHash
getConfigHashes :: MonadSqlQuery m => Config -> m ([Int], [Int])
getConfigHashes c = do
let ch = hashConfig c
getConfigHashes :: MonadSqlQuery m => [Budget] -> [History] -> m ([Int], [Int])
getConfigHashes bs hs = do
let ch = hashConfig bs hs
dh <- getDBHashes
return $ setDiff dh ch
@ -306,9 +302,11 @@ indexAcntRoot r =
getDBState
:: (MonadInsertError m, MonadSqlQuery m)
=> Config
-> [Budget]
-> [History]
-> m (DBState, DBUpdates)
getDBState c = do
(del, new) <- getConfigHashes c
getDBState c bs hs = do
(del, new) <- getConfigHashes bs hs
combineError bi si $ \b s ->
( DBState
{ kmCurrency = currencyMap cs
@ -327,8 +325,8 @@ getDBState c = do
}
)
where
bi = liftExcept $ resolveDaySpan $ budgetInterval $ global c
si = liftExcept $ resolveDaySpan $ statementInterval $ global c
bi = liftExcept $ resolveDaySpan $ budgetInterval $ scope c
si = liftExcept $ resolveDaySpan $ statementInterval $ scope c
(acnts, paths, am) = indexAcntRoot $ accounts c
cs = currency2Record <$> currencies c
ts = toRecord <$> tags c

View File

@ -380,10 +380,8 @@ deriving instance FromDhall AccountRootF
type AccountRoot = AccountRoot_ AccountTree
data Config_ a = Config_
{ global :: !TemporalScope
, budget :: ![Budget]
{ scope :: !TemporalScope
, currencies :: ![Currency]
, statements :: ![History]
, accounts :: !a
, tags :: ![Tag]
, sqlConfig :: !SqlConfig

View File

@ -27,6 +27,7 @@ module Internal.Utils
, combineErrorIOM3
, collectErrorsIO
, mapErrorsIO
, mapErrorsPooledIO
, showError
, acntPath2Text
, showT
@ -387,6 +388,14 @@ combineErrorIOM3 :: MonadUnliftIO m => m a -> m b -> m c -> (a -> b -> c -> m d)
combineErrorIOM3 a b c f =
combineErrorIOM2 (combineErrorIOM2 a b (curry return)) c $ \(x, y) z -> f x y z
mapErrorsPooledIO :: (Traversable t, MonadUnliftIO m) => Int -> (a -> m b) -> t a -> m (t b)
mapErrorsPooledIO t f xs = pooledMapConcurrentlyN t go $ enumTraversable xs
where
go (n, x) = catch (f x) $ \(InsertException e) -> do
es <- fmap catMaybes $ mapM (err . f) $ drop (n + 1) $ toList xs
throwIO $ InsertException $ foldr (<>) e es
err x = catch (Nothing <$ x) $ \(InsertException es) -> pure $ Just es
mapErrorsIO :: (Traversable t, MonadUnliftIO m) => (a -> m b) -> t a -> m (t b)
mapErrorsIO f xs = mapM go $ enumTraversable xs
where