Added builderCount

This commit is contained in:
iko 2020-02-24 14:20:49 +03:00
parent 4597fdc5d8
commit 1b7e15557d
4 changed files with 49 additions and 31 deletions

View File

@ -10,6 +10,7 @@ module Control.Task.Scheduler.Query
where
import Colog
import Control.Arrow
import Control.Monad.IO.Unlift
import Control.Squeal
import Control.Task
@ -38,6 +39,7 @@ scheduleTaskQuery task = do
Set (param @1) `as` #task
:* Set (param @2) `as` #payload
:* Default `as` #creation_time
:* Default `as` #started
)
( OnConflict
(OnConstraint #pk_task_payload)
@ -82,7 +84,9 @@ rescheduleTaskQuery payload = do
query =
update_
#tasks
(Default `as` #creation_time)
( Default `as` #creation_time
:* Default `as` #started
)
(param @1 .== #task .&& param @2 .== #payload)
dbWrite (const $ return ()) $
manipulateParams_
@ -106,11 +110,20 @@ pickTasksQuery tasks limitCount = do
)
( from (table #tasks)
& where_ (#task `in_` (literal <$> tasks))
& orderBy [#creation_time & Asc]
& where_ (not_ . notNull $ #started)
& limit limitCount
)
dbRead (const $ return []) $
runQuery query >>= getRows
startedMan :: Manipulation_ Schema (String, Jsonb Value) ()
startedMan =
update_
#tasks
( Set true `as` #started
)
(param @1 .== #task .&& param @2 .== #payload)
dbWrite (const $ return []) $ do
pickedTasks <- runQuery query >>= getRows
traversePrepared_ startedMan $ (task &&& payload) <$> pickedTasks
return pickedTasks
data PickedTask
= PickedTask

View File

@ -193,13 +193,13 @@ runServer = do
repeatIfNotEmpty n f
printingErrors =
handleAny (logError . T.pack . displayException >=> const (return 0))
_ <-
forkIO . repeatIfNotEmpty (10 ^ (6 :: Int))
replicateM_ builderCount
$ forkIO . repeatIfNotEmpty (10 ^ (6 :: Int))
. usingConnectionPool pool
. (`runReaderT` serverData)
. unApp
. printingErrors
$ runTasks @'[Build] 1
$ runTasks @'[Build] 1
_ <-
forkIO . repeatIfNotEmpty (10 ^ (6 :: Int))
. usingConnectionPool pool

View File

@ -31,7 +31,8 @@ data Config
baseSiteUrl :: !String,
cfgDockerUrl :: !String,
cfgDockerFile :: !FilePath,
logSeverity :: !Severity
logSeverity :: !Severity,
builderCount :: !Int
}
deriving (Show, Generic)
@ -58,7 +59,8 @@ defaultConfig =
baseSiteUrl = "http://localhost:8080",
cfgDockerUrl = "http://localhost:1234",
cfgDockerFile = "TmpDockerfile",
logSeverity = Info
logSeverity = Info,
builderCount = 2
}
getConfig :: IO Config

View File

@ -18,7 +18,8 @@ type SchemaV1 =
]
:=> '[ "task" ::: 'NoDef :=> 'NotNull (PG Text),
"payload" ::: 'NoDef :=> 'NotNull (PG (Jsonb Value)),
"creation_time" ::: 'Def :=> 'NotNull (PG UTCTime)
"creation_time" ::: 'Def :=> 'NotNull (PG UTCTime),
"started" ::: 'Def :=> 'NotNull (PG Bool)
]
),
"submissions"
@ -34,28 +35,30 @@ type SchemaV1 =
]
schemaMigrationV1 :: Migration Definition (Public '[]) SchemaV1
schemaMigrationV1 = Migration
{ name = "v1",
up =
createTable
#tasks
( notNullable text `as` #task
:* notNullable jsonb `as` #payload
:* default_ now (notNullable timestampWithTimeZone) `as` #creation_time
)
(primaryKey (#task :* #payload) `as` #pk_task_payload)
>>> createTable
#submissions
( notNullable text `as` #user_name
:* notNullable text `as` #repo_full_name
:* notNullable text `as` #problem
:* notNullable text `as` #sha
:* notNullable jsonb `as` #status
schemaMigrationV1 =
Migration
{ name = "v1",
up =
createTable
#tasks
( notNullable text `as` #task
:* notNullable jsonb `as` #payload
:* default_ now (notNullable timestampWithTimeZone) `as` #creation_time
:* default_ false (notNullable bool) `as` #started
)
(primaryKey (#repo_full_name :* #sha) `as` #pk_submission_repo_full_name_sha)
>>> createIndexes,
down = dropTable #tasks >>> dropTable #submissions
}
(primaryKey (#task :* #payload) `as` #pk_task_payload)
>>> createTable
#submissions
( notNullable text `as` #user_name
:* notNullable text `as` #repo_full_name
:* notNullable text `as` #problem
:* notNullable text `as` #sha
:* notNullable jsonb `as` #status
)
(primaryKey (#repo_full_name :* #sha) `as` #pk_submission_repo_full_name_sha)
>>> createIndexes,
down = dropTable #tasks >>> dropTable #submissions
}
where
createIndexes :: Definition sch sch
createIndexes =