mirror of
https://github.com/graninas/Hydra.git
synced 2025-01-08 18:27:55 +03:00
WIP
This commit is contained in:
parent
3b2b7b30ba
commit
6a84764787
37
app/astro/Astro/SqlDB/CatalogueDB.hs
Normal file
37
app/astro/Astro/SqlDB/CatalogueDB.hs
Normal file
@ -0,0 +1,37 @@
|
||||
{-# LANGUAGE DeriveAnyClass #-}
|
||||
|
||||
module Astro.SqlDB.CatalogueDB where
|
||||
|
||||
import Hydra.Prelude
|
||||
|
||||
import Database.Beam
|
||||
|
||||
|
||||
data MeteorT f = Meteor
|
||||
{ _id :: Columnar f Int
|
||||
, _size :: Columnar f Int
|
||||
, _mass :: Columnar f Int
|
||||
, _azimuth :: Columnar f Int
|
||||
, _altitude :: Columnar f Int
|
||||
, _timestamp :: Columnar f UTCTime
|
||||
}
|
||||
deriving (Show, Eq, Ord, Generic, Beamable)
|
||||
|
||||
type Meteor = MeteorT Identity
|
||||
type MeteorId = PrimaryKey MeteorT Identity
|
||||
|
||||
|
||||
instance Table MeteorT where
|
||||
data PrimaryKey MeteorT f = MeteorId (Columnar f Int)
|
||||
deriving (Generic, Beamable)
|
||||
primaryKey = MeteorId . _id
|
||||
|
||||
|
||||
data CatalogueDB f
|
||||
= CatalogueDB
|
||||
{ _meteors :: f (TableEntity MeteorT)
|
||||
}
|
||||
deriving (Generic, Database be)
|
||||
|
||||
catalogueDB :: DatabaseSettings be CatalogueDB
|
||||
catalogueDB = defaultDbSettings
|
@ -7,3 +7,4 @@ import Hydra.Core.Domain.Process as X
|
||||
import Hydra.Core.Domain.State as X
|
||||
import Hydra.Core.Domain.DB as X
|
||||
import Hydra.Core.Domain.KVDB as X
|
||||
import Hydra.Core.Domain.SQLDB as X
|
||||
|
@ -10,15 +10,15 @@ import qualified Data.Aeson as A
|
||||
import qualified Data.ByteString.Lazy as LBS
|
||||
|
||||
data DBErrorType
|
||||
= SystemError
|
||||
| KeyNotFound
|
||||
| InvalidType
|
||||
| DecodingFailed
|
||||
| UnknownDBError
|
||||
deriving (Generic, Ord, Eq, Enum, Bounded, Show, Read)
|
||||
= SystemError
|
||||
| KeyNotFound
|
||||
| InvalidType
|
||||
| DecodingFailed
|
||||
| UnknownDBError
|
||||
deriving (Generic, Ord, Eq, Enum, Bounded, Show, Read)
|
||||
|
||||
data DBError = DBError DBErrorType Text
|
||||
deriving (Generic, Ord, Eq, Show, Read)
|
||||
deriving (Generic, Ord, Eq, Show, Read)
|
||||
|
||||
type DBResult a = Either DBError a
|
||||
|
||||
@ -30,7 +30,13 @@ data DBType
|
||||
| RocksDB
|
||||
deriving (Show, Read, Ord, Eq, Enum, Bounded, Generic, ToJSON, FromJSON)
|
||||
|
||||
data SqlDBType
|
||||
= SQLite
|
||||
deriving (Show, Read, Ord, Eq, Enum, Bounded, Generic, ToJSON, FromJSON)
|
||||
|
||||
type DBName = String
|
||||
|
||||
data DBHandle db = DBHandle DBType DBName
|
||||
deriving (Show, Generic)
|
||||
deriving (Show, Generic)
|
||||
|
||||
data SqlDBHandle = SQLiteHandle SqlDBType DBName
|
||||
|
23
src/Hydra/Core/Domain/SQLDB.hs
Normal file
23
src/Hydra/Core/Domain/SQLDB.hs
Normal file
@ -0,0 +1,23 @@
|
||||
{-# LANGUAGE AllowAmbiguousTypes #-}
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
{-# LANGUAGE FlexibleContexts #-}
|
||||
{-# LANGUAGE FunctionalDependencies #-}
|
||||
{-# LANGUAGE MultiParamTypeClasses #-}
|
||||
{-# LANGUAGE PolyKinds #-}
|
||||
{-# LANGUAGE TypeFamilies #-}
|
||||
{-# LANGUAGE DeriveAnyClass #-}
|
||||
|
||||
|
||||
module Hydra.Core.Domain.SQLDB where
|
||||
|
||||
import Hydra.Prelude
|
||||
|
||||
import qualified Data.Aeson as A
|
||||
import qualified Data.ByteString.Lazy as LBS
|
||||
import System.FilePath ((</>))
|
||||
|
||||
import Hydra.Core.Domain.DB
|
||||
|
||||
data SqlDBConfig
|
||||
= SQLiteConfig DBName
|
||||
deriving (Show, Read, Ord, Eq, Generic, ToJSON, FromJSON)
|
@ -6,8 +6,6 @@ import qualified Data.Map as Map
|
||||
|
||||
import qualified Hydra.Core.Domain as D
|
||||
import qualified Hydra.Core.Language as L
|
||||
import qualified Hydra.Core.Logger.Impl.HsLogger as Impl
|
||||
import qualified Hydra.Core.Logger.Impl.HsLoggerInterpreter as I
|
||||
|
||||
import qualified Database.RocksDB as Rocks
|
||||
import qualified Database.Redis as Redis
|
||||
@ -30,18 +28,15 @@ initRocksDB'
|
||||
-> String
|
||||
-> IO (D.DBResult (D.DBHandle db))
|
||||
initRocksDB' rocksDBsVars cfg@(D.RocksDBConfig _ createIfMiss errorIfErr) dbname = do
|
||||
rocksDBs <- atomically $ takeTMVar rocksDBsVars
|
||||
let dbPath = D.getKVDBName cfg
|
||||
eDb <- try $ Rocks.open dbPath $ initRocksOptions createIfMiss errorIfErr
|
||||
case eDb of
|
||||
Left (err :: SomeException) -> do
|
||||
atomically $ putTMVar rocksDBsVars rocksDBs
|
||||
pure $ Left $ D.DBError D.SystemError $ show err
|
||||
Left (err :: SomeException) -> pure $ Left $ D.DBError D.SystemError $ show err
|
||||
Right db -> do
|
||||
dbM <- newMVar db
|
||||
atomically
|
||||
$ putTMVar rocksDBsVars
|
||||
$ Map.insert dbname dbM rocksDBs
|
||||
atomically $ do
|
||||
rocksDBs <- takeTMVar rocksDBsVars
|
||||
putTMVar rocksDBsVars $ Map.insert dbname dbM rocksDBs
|
||||
pure $ Right $ D.DBHandle D.RocksDB dbname
|
||||
|
||||
-- TODO: defaultConnectInfo
|
||||
@ -53,16 +48,13 @@ initRedisDB'
|
||||
-> String
|
||||
-> IO (D.DBResult (D.DBHandle db))
|
||||
initRedisDB' redisConnsVar _ dbname = do
|
||||
mConns <- atomically $ takeTMVar redisConnsVar
|
||||
eConn <- try $ Redis.checkedConnect Redis.defaultConnectInfo
|
||||
case eConn of
|
||||
Left (err :: SomeException) -> do
|
||||
atomically $ putTMVar redisConnsVar mConns
|
||||
pure $ Left $ D.DBError D.SystemError $ show err
|
||||
Left (err :: SomeException) -> pure $ Left $ D.DBError D.SystemError $ show err
|
||||
Right conn -> do
|
||||
atomically
|
||||
$ putTMVar redisConnsVar
|
||||
$ Map.insert dbname conn mConns
|
||||
atomically $ do
|
||||
mConns <- takeTMVar redisConnsVar
|
||||
putTMVar redisConnsVar $ Map.insert dbname conn mConns
|
||||
pure $ Right $ D.DBHandle D.Redis dbname
|
||||
|
||||
deInitRocksDB :: RocksDBHandle -> IO ()
|
||||
|
@ -10,8 +10,10 @@ import qualified Hydra.Core.Logger.Impl.HsLogger as Impl
|
||||
import qualified Hydra.Core.Logger.Impl.HsLoggerInterpreter as I
|
||||
|
||||
import qualified Hydra.Core.KVDBRuntime as R
|
||||
import qualified Hydra.Core.SqlDBRuntime as R
|
||||
import qualified Database.RocksDB as Rocks
|
||||
import qualified Database.Redis as Redis
|
||||
import qualified Database.SQLite.Simple as SQLite
|
||||
|
||||
-- | Runtime data for the concrete logger impl.
|
||||
newtype LoggerRuntime = LoggerRuntime
|
||||
@ -28,6 +30,7 @@ data ProcessRuntime = ProcessRuntime
|
||||
data CoreRuntime = CoreRuntime
|
||||
{ _rocksDBs :: R.RocksDBHandles
|
||||
, _redisConns :: R.RedisConnections
|
||||
, _sqliteConns :: R.SQLiteDBConns
|
||||
, _loggerRuntime :: LoggerRuntime
|
||||
, _stateRuntime :: StateRuntime
|
||||
, _processRuntime :: ProcessRuntime
|
||||
@ -72,6 +75,7 @@ createCoreRuntime :: LoggerRuntime -> IO CoreRuntime
|
||||
createCoreRuntime loggerRt = CoreRuntime
|
||||
<$> newTMVarIO Map.empty
|
||||
<*> newTMVarIO Map.empty
|
||||
<*> newTMVarIO Map.empty
|
||||
<*> pure loggerRt
|
||||
<*> createStateRuntime
|
||||
<*> createProcessRuntime
|
||||
@ -88,6 +92,7 @@ clearCoreRuntime coreRt =
|
||||
(clearProcessRuntime $ _processRuntime coreRt)
|
||||
`finally` (R.closeRocksDBs $ _rocksDBs coreRt)
|
||||
`finally` (R.closeRedisConns $ _redisConns coreRt)
|
||||
`finally` (R.closeSQLiteConns $ _sqliteConns coreRt)
|
||||
|
||||
-- TODO: Church version of flusher.
|
||||
-- | Writes all stm entries into real logger.
|
||||
|
23
src/Hydra/Core/SqlDB/Interpreter.hs
Normal file
23
src/Hydra/Core/SqlDB/Interpreter.hs
Normal file
@ -0,0 +1,23 @@
|
||||
module Hydra.Core.SqlDB.Interpreter where
|
||||
|
||||
import Hydra.Prelude
|
||||
|
||||
import qualified Hydra.Core.Language as L
|
||||
import qualified Hydra.Core.RLens as RLens
|
||||
import qualified Hydra.Core.Runtime as R
|
||||
import qualified Hydra.Core.Domain as D
|
||||
|
||||
-- aggregate_ (\t -> ( as_ @Double @QAggregateContext $ customExpr_ (\bytes ms -> "regr_intercept(" <> bytes <> ", " <> ms <> ")") (trackBytes t) (trackMilliseconds t)
|
||||
-- , as_ @Double @QAggregateContext $ customExpr_ (\bytes ms -> "regr_slope(" <> bytes <> ", " <> ms <> ")") (trackBytes t) (trackMilliseconds t) )) $
|
||||
-- all_ (track chinookDb)
|
||||
|
||||
-- SELECT regr_intercept(("t0"."Bytes"), ("t0"."Milliseconds")) AS "res0",
|
||||
-- regr_slope(("t0"."Bytes"), ("t0"."Milliseconds")) AS "res1"
|
||||
-- FROM "Track" AS "t0"
|
||||
|
||||
|
||||
interpretSqlDBF :: db -> L.SqlDBF a -> IO a
|
||||
interpretSqlDBF db (RawQuery rawQ next) = error "not implemented"
|
||||
|
||||
runSqlDBL :: db -> L.SqlDBL a -> IO a
|
||||
runSqlDBL conn act = foldFree (interpretSqlDBF conn) act
|
25
src/Hydra/Core/SqlDB/Language.hs
Normal file
25
src/Hydra/Core/SqlDB/Language.hs
Normal file
@ -0,0 +1,25 @@
|
||||
{-# LANGUAGE AllowAmbiguousTypes #-}
|
||||
{-# LANGUAGE TemplateHaskell #-}
|
||||
{-# LANGUAGE TypeOperators #-}
|
||||
|
||||
module Hydra.Core.SqlDB.Language where
|
||||
|
||||
import Hydra.Prelude
|
||||
|
||||
import qualified Hydra.Core.Domain.DB as D
|
||||
import qualified Hydra.Core.Domain.SQLDB as D
|
||||
|
||||
import Language.Haskell.TH.MakeFunctor (makeFunctorInstance)
|
||||
|
||||
import Database.Beam
|
||||
import Database.Beam.Sqlite
|
||||
|
||||
data SqlDBF next where
|
||||
RunBeam :: String -> (D.DBResult a -> next) -> SqlDBF next
|
||||
|
||||
makeFunctorInstance ''SqlDBF
|
||||
|
||||
type SqlDBL db = Free SqlDBF
|
||||
|
||||
rawQuery :: String -> SqlDBL (D.DBResult a)
|
||||
rawQuery rawQuery = liftF $ RawQuery rawQuery id
|
41
src/Hydra/Core/SqlDBRuntime.hs
Normal file
41
src/Hydra/Core/SqlDBRuntime.hs
Normal file
@ -0,0 +1,41 @@
|
||||
module Hydra.Core.SqlDBRuntime where
|
||||
|
||||
import Hydra.Prelude
|
||||
|
||||
import qualified Data.Map as Map
|
||||
|
||||
import qualified Hydra.Core.Domain as D
|
||||
import qualified Hydra.Core.Language as L
|
||||
|
||||
import qualified Database.SQLite.Simple as SQLite
|
||||
|
||||
|
||||
type SQLiteDBConn = MVar SQLite.Connection
|
||||
type SQLiteDBConns = TMVar (Map D.DBName SQLiteDBConn)
|
||||
|
||||
initSQLiteDB'
|
||||
:: SQLiteDBConns
|
||||
-> D.SqlDBConfig
|
||||
-> IO (D.DBResult D.SqlDBHandle)
|
||||
initSQLiteDB' connsVar cfg@(D.SQLiteConfig dbName) = do
|
||||
eConn <- try $ SQLite.open dbName
|
||||
case eConn of
|
||||
Left (err :: SomeException) -> pure $ Left $ D.DBError D.SystemError $ show err
|
||||
Right conn -> do
|
||||
dbM <- newMVar conn
|
||||
atomically $ do
|
||||
conns <- takeTMVar connsVar
|
||||
putTMVar connsVar $ Map.insert dbName dbM conns
|
||||
pure $ Right $ D.SQLiteHandle D.SQLite dbName
|
||||
|
||||
deInitSQLiteDB :: SQLiteDBConn -> IO ()
|
||||
deInitSQLiteDB connVar = do
|
||||
conn <- takeMVar connVar
|
||||
void $ try $ SQLite.close conn
|
||||
putMVar connVar conn
|
||||
|
||||
closeSQLiteDBs :: SQLiteDBConns -> IO ()
|
||||
closeSQLiteDBs handleMapVar = do
|
||||
handleMap <- atomically $ takeTMVar handleMapVar
|
||||
mapM_ deInitSQLiteDB $ Map.elems handleMap
|
||||
atomically $ putTMVar handleMapVar Map.empty
|
@ -10,12 +10,14 @@ import qualified Hydra.Core.Language as L
|
||||
import qualified Hydra.Core.RLens as RLens
|
||||
import qualified Hydra.Core.Runtime as R
|
||||
import qualified Hydra.Core.KVDBRuntime as R
|
||||
import qualified Hydra.Core.SqlDBRuntime as R
|
||||
import qualified Hydra.Framework.Language as L
|
||||
import qualified Hydra.Framework.RLens as RLens
|
||||
import qualified Hydra.Framework.Runtime as R
|
||||
|
||||
import qualified Database.RocksDB as Rocks
|
||||
import qualified Database.Redis as Redis
|
||||
import qualified Database.SQLite.Simple as SQLite
|
||||
|
||||
langRunner :: R.CoreRuntime -> Impl.LangRunner L.LangL
|
||||
langRunner coreRt = Impl.LangRunner (Impl.runLangL coreRt)
|
||||
@ -26,6 +28,10 @@ initKVDB' coreRt cfg@(D.RocksDBConfig _ _ _) dbName =
|
||||
initKVDB' coreRt cfg@(D.RedisConfig) dbName =
|
||||
R.initRedisDB' (coreRt ^. RLens.redisConns) cfg dbName
|
||||
|
||||
initSqlDB' :: R.CoreRuntime -> D.SqlDBConfig -> IO (D.DBResult D.SqlDBHandle)
|
||||
initSqlDB' coreRt cfg@(D.SQLiteConfig dbName) =
|
||||
R.initSQLiteDB' (coreRt ^. RLens.sqliteConns) cfg
|
||||
|
||||
interpretAppF :: R.AppRuntime -> L.AppF a -> IO a
|
||||
interpretAppF appRt (L.EvalLang action next) = do
|
||||
let coreRt = appRt ^. RLens.coreRuntime
|
||||
@ -37,7 +43,11 @@ interpretAppF appRt (L.EvalProcess action next) = do
|
||||
res <- Impl.runProcessL (langRunner coreRt) (coreRt ^. RLens.processRuntime) action
|
||||
pure $ next res
|
||||
|
||||
interpretAppF appRt (L.InitKVDB cfg dbName next) = next <$> initKVDB' (appRt ^. RLens.coreRuntime) cfg dbName
|
||||
interpretAppF appRt (L.InitKVDB cfg dbName next) =
|
||||
next <$> initKVDB' (appRt ^. RLens.coreRuntime) cfg dbName
|
||||
|
||||
interpretAppF appRt (L.InitSqlDB cfg next) =
|
||||
next <$> initSqlDB' (appRt ^. RLens.coreRuntime) cfg
|
||||
|
||||
runAppL :: R.AppRuntime -> L.AppL a -> IO a
|
||||
runAppL appRt = foldFree (interpretAppF appRt)
|
||||
|
@ -27,6 +27,8 @@ data AppF next where
|
||||
-- TODO: add explicit deinit.
|
||||
-- DeinitKVDB :: D.DB db => D.DBHandle db -> (D.DBResult Bool -> next) -> AppF next
|
||||
|
||||
InitSqlDB :: D.SqlDBConfig -> (D.DBResult D.SqlDBHandle -> next) -> AppF next
|
||||
|
||||
makeFunctorInstance ''AppF
|
||||
|
||||
type AppL = Free AppF
|
||||
|
Loading…
Reference in New Issue
Block a user