This commit is contained in:
Mark Eibes 2020-01-20 21:56:40 +01:00
parent 8b4e7660bd
commit bbaf3fbd35
16 changed files with 554 additions and 150 deletions

1
.gitignore vendored
View File

@ -1,3 +1,4 @@
.history/
client/bower_components/
client/node_modules/
client/.pulp-cache/

View File

@ -3580,9 +3580,9 @@
}
},
"buffer": {
"version": "4.9.1",
"resolved": "https://registry.npmjs.org/buffer/-/buffer-4.9.1.tgz",
"integrity": "sha1-bRu2AbB6TvztlwlBMgkwJ8lbwpg=",
"version": "4.9.2",
"resolved": "https://registry.npmjs.org/buffer/-/buffer-4.9.2.tgz",
"integrity": "sha512-xq+q3SRMOxGivLhBNaUdC64hDTQwejJ+H0T/NB1XMtTVEwNTrfFF3gAxiyW0Bu/xWEGhjVKgUcMhCrUy2+uCWg==",
"dev": true,
"requires": {
"base64-js": "^1.0.2",
@ -4206,13 +4206,10 @@
"dev": true
},
"console-browserify": {
"version": "1.1.0",
"resolved": "https://registry.npmjs.org/console-browserify/-/console-browserify-1.1.0.tgz",
"integrity": "sha1-8CQcRXMKn8YyOyBtvzjtx0HQuxA=",
"dev": true,
"requires": {
"date-now": "^0.1.4"
}
"version": "1.2.0",
"resolved": "https://registry.npmjs.org/console-browserify/-/console-browserify-1.2.0.tgz",
"integrity": "sha512-ZMkYO/LkF17QvCPqM0gxw8yUzigAOZOSWSHg91FH6orS7vcEj5dVZTidN2fQ14yBSdg97RqhSNwLUXInd52OTA==",
"dev": true
},
"console-control-strings": {
"version": "1.1.0",
@ -4612,12 +4609,6 @@
"whatwg-url": "^7.0.0"
}
},
"date-now": {
"version": "0.1.4",
"resolved": "https://registry.npmjs.org/date-now/-/date-now-0.1.4.tgz",
"integrity": "sha1-6vQ5/U1ISK105cx9vvIAZyueNFs=",
"dev": true
},
"debug": {
"version": "3.2.6",
"resolved": "https://registry.npmjs.org/debug/-/debug-3.2.6.tgz",
@ -4847,9 +4838,9 @@
"dev": true
},
"des.js": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/des.js/-/des.js-1.0.0.tgz",
"integrity": "sha1-wHTS4qpqipoH29YfmhXCzYPsjsw=",
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/des.js/-/des.js-1.0.1.tgz",
"integrity": "sha512-Q0I4pfFrv2VPd34/vfLrFOoRmlYj3OV50i7fskps1jZWK1kApMWWT9G6RRUeYedLcBDIhnSDaUvJMb3AhUlaEA==",
"dev": true,
"requires": {
"inherits": "^2.0.1",
@ -5129,9 +5120,9 @@
}
},
"elliptic": {
"version": "6.5.1",
"resolved": "https://registry.npmjs.org/elliptic/-/elliptic-6.5.1.tgz",
"integrity": "sha512-xvJINNLbTeWQjrl6X+7eQCrIy/YPv5XCpKW6kB5mKvtnGILoLDcySuwomfdzt0BMdLNVnuRNTuzKNHj0bva1Cg==",
"version": "6.5.2",
"resolved": "https://registry.npmjs.org/elliptic/-/elliptic-6.5.2.tgz",
"integrity": "sha512-f4x70okzZbIQl/NSRLkI/+tteV/9WqL98zx+SQ69KbXxmVrmjwsNUPn/gYJJ0sHvEak24cZgHIPegRePAtA/xw==",
"dev": true,
"requires": {
"bn.js": "^4.4.0",
@ -5361,9 +5352,9 @@
"dev": true
},
"events": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/events/-/events-3.0.0.tgz",
"integrity": "sha512-Dc381HFWJzEOhQ+d8pkNon++bk9h6cdAoAj4iE6Q4y6xgTzySWXlKn05/TVNpjnfRqi/X0EpJEJohPjNI3zpVA==",
"version": "3.1.0",
"resolved": "https://registry.npmjs.org/events/-/events-3.1.0.tgz",
"integrity": "sha512-Rv+u8MLHNOdMjTAFeT3nCjHn2aGlx435FP/sDHNaRhDEMwyI/aB22Kj2qIN8R0cw3z28psEQLYwxVKLsKrMgWg==",
"dev": true
},
"eventsource": {
@ -11235,9 +11226,9 @@
}
},
"serialize-javascript": {
"version": "1.9.1",
"resolved": "https://registry.npmjs.org/serialize-javascript/-/serialize-javascript-1.9.1.tgz",
"integrity": "sha512-0Vb/54WJ6k5v8sSWN09S0ora+Hnr+cX40r9F170nT+mSkaxltoE/7R3OrIdBSUv1OoiobH1QoWQbCnAO+e8J1A==",
"version": "2.1.2",
"resolved": "https://registry.npmjs.org/serialize-javascript/-/serialize-javascript-2.1.2.tgz",
"integrity": "sha512-rs9OggEUF0V4jUSecXazOYsLfu7OGK2qIn3c7IPBiffz32XniEp/TX9Xmc9LQfK2nQ2QKHvZ2oygKUGU0lG4jQ==",
"dev": true
},
"serve-favicon": {
@ -11699,9 +11690,9 @@
}
},
"source-map-support": {
"version": "0.5.13",
"resolved": "https://registry.npmjs.org/source-map-support/-/source-map-support-0.5.13.tgz",
"integrity": "sha512-SHSKFHadjVA5oR4PPqhtAVdcBWwRYVd6g6cAXnIbRiIwc2EhPrTuKUBdSLvlEKyIP3GCf89fltvcZiP9MMFA1w==",
"version": "0.5.16",
"resolved": "https://registry.npmjs.org/source-map-support/-/source-map-support-0.5.16.tgz",
"integrity": "sha512-efyLRJDr68D9hBBNIPWFjhpFzURh+KJykQwvMyW5UiZzYwoF6l4YMMDIJJEyFWxWCqfyxLzz6tSfUFR+kXXsVQ==",
"dev": true,
"requires": {
"buffer-from": "^1.0.0",
@ -12235,9 +12226,9 @@
}
},
"terser": {
"version": "4.3.3",
"resolved": "https://registry.npmjs.org/terser/-/terser-4.3.3.tgz",
"integrity": "sha512-Nzr7dpRjSzMEUS+z2UYQBtzE0LDm5k0Yy8RgLRPy85QUo1TjU5lIOBwzS5/FVAMaVyHZ3WTTU2BuQcMn8KXnNQ==",
"version": "4.6.3",
"resolved": "https://registry.npmjs.org/terser/-/terser-4.6.3.tgz",
"integrity": "sha512-Lw+ieAXmY69d09IIc/yqeBqXpEQIpDGZqT34ui1QWXIUpR2RjbqEkT8X7Lgex19hslSqcWM5iMN2kM11eMsESQ==",
"dev": true,
"requires": {
"commander": "^2.20.0",
@ -12254,16 +12245,16 @@
}
},
"terser-webpack-plugin": {
"version": "1.4.1",
"resolved": "https://registry.npmjs.org/terser-webpack-plugin/-/terser-webpack-plugin-1.4.1.tgz",
"integrity": "sha512-ZXmmfiwtCLfz8WKZyYUuuHf3dMYEjg8NrjHMb0JqHVHVOSkzp3cW2/XG1fP3tRhqEqSzMwzzRQGtAPbs4Cncxg==",
"version": "1.4.3",
"resolved": "https://registry.npmjs.org/terser-webpack-plugin/-/terser-webpack-plugin-1.4.3.tgz",
"integrity": "sha512-QMxecFz/gHQwteWwSo5nTc6UaICqN1bMedC5sMtUc7y3Ha3Q8y6ZO0iCR8pq4RJC8Hjf0FEPEHZqcMB/+DFCrA==",
"dev": true,
"requires": {
"cacache": "^12.0.2",
"find-cache-dir": "^2.1.0",
"is-wsl": "^1.1.0",
"schema-utils": "^1.0.0",
"serialize-javascript": "^1.7.0",
"serialize-javascript": "^2.1.2",
"source-map": "^0.6.1",
"terser": "^4.1.2",
"webpack-sources": "^1.4.0",
@ -12862,9 +12853,9 @@
}
},
"vm-browserify": {
"version": "1.1.0",
"resolved": "https://registry.npmjs.org/vm-browserify/-/vm-browserify-1.1.0.tgz",
"integrity": "sha512-iq+S7vZJE60yejDYM0ek6zg308+UZsdtPExWP9VZoCFCz1zkJoXFnAX7aZfd/ZwrkidzdUZL0C/ryW+JwAiIGw==",
"version": "1.1.2",
"resolved": "https://registry.npmjs.org/vm-browserify/-/vm-browserify-1.1.2.tgz",
"integrity": "sha512-2ham8XPWTONajOR0ohOKOHXkm3+gaBmGut3SRuu75xLd/RRaY6vqgh8NBYYk7+RW3u5AtzPQZG8F10LHkl0lAQ==",
"dev": true
},
"w3c-hr-time": {
@ -12929,9 +12920,9 @@
"dev": true
},
"webpack": {
"version": "4.41.0",
"resolved": "https://registry.npmjs.org/webpack/-/webpack-4.41.0.tgz",
"integrity": "sha512-yNV98U4r7wX1VJAj5kyMsu36T8RPPQntcb5fJLOsMz/pt/WrKC0Vp1bAlqPLkA1LegSwQwf6P+kAbyhRKVQ72g==",
"version": "4.41.5",
"resolved": "https://registry.npmjs.org/webpack/-/webpack-4.41.5.tgz",
"integrity": "sha512-wp0Co4vpyumnp3KlkmpM5LWuzvZYayDwM2n17EHFr4qxBBbRokC7DJawPJC7TfSFZ9HZ6GsdH40EBj4UV0nmpw==",
"dev": true,
"requires": {
"@webassemblyjs/ast": "1.8.5",
@ -12954,7 +12945,7 @@
"node-libs-browser": "^2.2.1",
"schema-utils": "^1.0.0",
"tapable": "^1.1.3",
"terser-webpack-plugin": "^1.4.1",
"terser-webpack-plugin": "^1.4.3",
"watchpack": "^1.6.0",
"webpack-sources": "^1.4.1"
}

View File

@ -1,14 +0,0 @@
module project.localhost/example
replace project.localhost/purescript-native/ffi-loader => ./purescript-native
replace project.localhost/purescript-native/output => ./output
go 1.13
require (
github.com/purescript-native/go-ffi v0.0.0-20191015034244-22b13919279c // indirect
github.com/purescript-native/go-runtime v0.1.0 // indirect
project.localhost/purescript-native/ffi-loader v0.0.0-00010101000000-000000000000 // indirect
project.localhost/purescript-native/output v0.0.0-00010101000000-000000000000 // indirect
)

3
playground/package-lock.json generated Normal file
View File

@ -0,0 +1,3 @@
{
"lockfileVersion": 1
}

View File

@ -1,8 +1,5 @@
module Main where
import Prelude
import Effect.Console
main = log message
message = "Du bist " <> show 7 <> " Jahre alt."
main = log "Hi from Insomnia!"

View File

@ -17,4 +17,5 @@
},
],
"purescript.buildCommand": "spago build --purs-args '--json-errors'",
"vscode_custom_css.imports": ["file:///Users/mark/.vscode-oss/extensions/webrender.synthwave-x-fluoromachine-0.0.9/synthwave-x-fluoromachine.css"]
}

View File

@ -365,6 +365,11 @@
"resolved": "https://registry.npmjs.org/utils-merge/-/utils-merge-1.0.1.tgz",
"integrity": "sha1-n5VxD1CiZ5R7LMwSR0HBAoQn5xM="
},
"uuid": {
"version": "3.4.0",
"resolved": "https://registry.npmjs.org/uuid/-/uuid-3.4.0.tgz",
"integrity": "sha512-HjSDRw6gZE5JMggctHBcjVak08+KEVhSIiDzFnT9S9aegmp85S/bReBVTb4QTFaRNptJ9kuYaNhnbNEOkbKb/A=="
},
"vary": {
"version": "1.1.2",
"resolved": "https://registry.npmjs.org/vary/-/vary-1.1.2.tgz",

View File

@ -14,6 +14,7 @@
"license": "ISC",
"dependencies": {
"body-parser": "^1.19.0",
"express": "^4.17.1"
"express": "^4.17.1",
"uuid": "^3.4.0"
}
}

View File

@ -2,10 +2,11 @@
Welcome to a Spago project!
You can edit this file as you like.
-}
{ name =
"my-project"
{ name = "my-project"
, dependencies =
[ "console"
[ "avar"
, "console"
, "debug"
, "effect"
, "express"
, "maybe"
@ -15,10 +16,11 @@ You can edit this file as you like.
, "node-process"
, "psci-support"
, "simple-json"
, "spec"
, "spec-discovery"
, "stringutils"
, "uuid"
]
, packages =
./packages.dhall
, sources =
[ "src/**/*.purs", "test/**/*.purs" ]
, packages = ./packages.dhall
, sources = [ "src/**/*.purs", "test/**/*.purs" ]
}

186
server/src/JobQueue.purs Normal file
View File

@ -0,0 +1,186 @@
module JobQueue where
import Prelude
import Data.Array (delete, filter, find, length, snoc, zip)
import Data.Foldable (for_)
import Data.Generic.Rep (class Generic)
import Data.Generic.Rep.Show (genericShow)
import Data.JSDate (JSDate)
import Data.JSDate as JSDate
import Data.Maybe (Maybe(..))
import Data.Newtype (class Newtype, over, un)
import Data.Time.Duration (Milliseconds(..))
import Data.Tuple.Nested ((/\))
import Data.UUID (UUID, genUUID)
import Effect (Effect)
import Effect.Aff (Aff, Fiber, delay, error, killFiber, launchAff, launchAff_, parallel, sequential)
import Effect.Class (class MonadEffect, liftEffect)
import Effect.Ref (Ref)
import Effect.Ref as Ref
newtype JobId
= JobId UUID
derive instance ntJobId :: Newtype JobId _
derive instance genericJobId :: Generic JobId _
instance showJobId :: Show JobId where
show = genericShow
derive newtype instance jobIdEq :: Eq JobId
newtype NewJob a
= NewJob (a -> Aff Unit)
data PendingJob a
= PendingJob
{ id ∷ JobId
, job ∷ a -> Aff Unit
, addedAt ∷ JSDate
}
data Job a
= Pending (PendingJob a)
| Running (RunningJob a)
getAddedAt ∷ ∀ a. Job a -> JSDate
getAddedAt = case _ of
Pending (PendingJob { addedAt }) -> addedAt
Running (RunningJob { addedAt }) -> addedAt
instance eqJob :: Eq (Job a) where
eq j1 j2 = getId j1 == getId j2
instance ordJob :: Ord (Job a) where
compare j1 j2 = compare (getAddedAt j1) (getAddedAt j2)
getId ∷ ∀ a. Job a -> JobId
getId = case _ of
Pending (PendingJob { id }) -> id
Running (RunningJob { id }) -> id
newtype RunningJob a
= RunningJob
{ id ∷ JobId
, addedAt ∷ JSDate
, startedAt ∷ JSDate
, fiber ∷ Fiber Unit
, resource ∷ a
}
type QueueParams
= { maxSize ∷ Int
, timeout ∷ Milliseconds
}
newtype ResourcePool a
= ResourcePool (Array a)
derive instance ntResourcePool :: Newtype (ResourcePool a) _
data Queue a
= Queue QueueParams (Ref (ResourcePool a)) (Ref (Array (Job a)))
mkQueue ∷ ∀ m a. MonadEffect m => QueueParams -> ResourcePool a -> m (Queue a)
mkQueue params pool =
liftEffect do
poolRef <- Ref.new pool
jobsRef <- Ref.new []
pure $ Queue params poolRef jobsRef
data EnqueueResult
= Enqueued JobId
| QueueFull
derive instance eqEnqueueResult :: Eq EnqueueResult
derive instance genericEnqueueResult :: Generic EnqueueResult _
instance showEnqueueResult :: Show EnqueueResult where
show = genericShow
startJob ∷ ∀ a. Eq a => Queue a -> a -> PendingJob a -> Effect Unit
startJob q@(Queue { timeout } _ jobsRef) resource (PendingJob { id, job, addedAt }) = do
startedAt <- JSDate.now
iso <- JSDate.toISOString startedAt
-- log $ "Starting Job: " <> toString (unwrap id) <> " at: " <> iso
let
remove = removeFromQueue id q
fiber <-
launchAff <<< sequential
$ parallel (job resource *> remove)
*> parallel (delay timeout *> remove)
let
running = RunningJob { id, fiber, addedAt, startedAt, resource }
jobsRef
# Ref.modify_ (map \j -> if getId j == id then Running running else j)
jobsInQueue ∷ ∀ a. Queue a -> Effect Int
jobsInQueue (Queue _ _ jobsRef) = do
jobs <- Ref.read jobsRef
pure $ length jobs
runningJobs ∷ ∀ a. Queue a -> Effect (Array (RunningJob a))
runningJobs (Queue _ resourceRef jobsRef) = do
jobs <- Ref.read jobsRef
pure do
job <- jobs
case job of
Running rj -> [ rj ]
Pending _ -> []
pendingJobs ∷ ∀ a. Queue a -> Effect (Array (PendingJob a))
pendingJobs (Queue _ resourceRef jobsRef) = do
jobs <- Ref.read jobsRef
pool <- Ref.read resourceRef
pure do
job <- jobs
case job of
Running _ -> []
Pending pj -> [ pj ]
startJobs ∷ ∀ a. Eq a => Queue a -> Effect Unit
startJobs q@(Queue { timeout } poolRef jobsRef) = do
pool <- Ref.read poolRef
pending <- pendingJobs q
for_ (pending `zip` un ResourcePool pool) \(job /\ resource) -> do
Ref.modify_ (over ResourcePool $ delete resource) poolRef
startJob q resource job
removeStaleJobs ∷ ∀ a. Eq a => Queue a -> Effect Unit
removeStaleJobs q@(Queue { timeout } resourceRef jobsRef) = do
running <- runningJobs q
nowMillis <- JSDate.now <#> JSDate.getTime
for_ running
$ \(RunningJob { startedAt, id }) -> do
let
runningSince = (nowMillis - JSDate.getTime startedAt) # Milliseconds
timedOut = runningSince > timeout
when timedOut $ launchAff_ (removeFromQueue id q)
removeFromQueue ∷ ∀ a. Eq a => JobId -> Queue a -> Aff Unit
removeFromQueue id q@(Queue _ poolRef jobsRef) = do
maybeJob <- Ref.read jobsRef <#> find (\x -> (getId x) == id) # liftEffect
-- remove from queue
jobsRef # Ref.modify_ (filter (\x -> (getId x) /= id)) # liftEffect
case maybeJob of
Just (Running (RunningJob { fiber })) -> killFiber (error "Removed from queue") fiber
_ -> pure unit
-- free resource
case maybeJob of
Just (Running (RunningJob { resource })) -> do
poolRef # Ref.modify_ (over ResourcePool (_ `snoc` resource)) # liftEffect
_ -> pure unit
startJobs q # liftEffect
--| Add to the end of the queue. If the queue is full the result will be `false`
enqueue ∷ ∀ a. Eq a => NewJob a -> Queue a -> Effect EnqueueResult
enqueue (NewJob newJob) q@(Queue { maxSize, timeout } _ jobsRef) = do
id <- genUUID <#> JobId
jobs <- Ref.read jobsRef
if length jobs >= maxSize then
pure QueueFull
else do
addedAt <- JSDate.now
let
new = Pending (PendingJob { id, addedAt, job: newJob })
jobsRef # Ref.modify_ (_ `snoc` new)
startJobs q
pure (Enqueued id)

View File

@ -0,0 +1,130 @@
module JobQueueSpec where
import Prelude
import Data.Time.Duration (class Duration, Seconds(..), fromDuration)
import Effect.Aff (Milliseconds(..), delay)
import Effect.Class (liftEffect)
import Effect.Class.Console (log)
import Effect.Ref (Ref)
import Effect.Ref as Ref
import JobQueue (EnqueueResult(..), NewJob(..), QueueParams, ResourcePool(..), mkQueue)
import JobQueue as Q
import Main (Folder(..), execCommand)
import Test.Spec (Spec, describe, it)
import Test.Spec.Assertions (shouldEqual, shouldNotEqual)
params ∷ QueueParams
params =
{ maxSize: 10
, timeout: (1.0 # Seconds) # fromDuration
}
mkJob ∷ String -> Number -> NewJob Unit
mkJob msg ms = NewJob \_ -> (delay (ms # Milliseconds) *> log msg)
job1 ∷ NewJob Unit
job1 = mkJob "Job 1" 10.0
writeRefJob ∷ ∀ a d. Show a => Duration d => Ref a -> a -> d -> NewJob Unit
writeRefJob ref a d =
NewJob \_ -> do
let
millis ∷ Milliseconds
millis = fromDuration d
delay millis
Ref.write a ref # liftEffect
pool :: ResourcePool Unit
pool = ResourcePool [unit, unit, unit]
spec ∷ Spec Unit
spec = do
describe "The Queue" do
it "can't enqueue if maxSize is 0" do
q <- mkQueue (params { maxSize = 0 }) pool
r <- Q.enqueue job1 q # liftEffect
r `shouldEqual` QueueFull
it "can enqueue if maxSize is 1" do
q <- mkQueue (params { maxSize = 1 }) pool
r <- Q.enqueue job1 q # liftEffect
r `shouldNotEqual` QueueFull
it "runsJobs" do
q <- mkQueue (params { maxSize = 1 }) pool
ref <- Ref.new false # liftEffect
r <- Q.enqueue (writeRefJob ref true (0.0 # Seconds)) q # liftEffect
r `shouldNotEqual` QueueFull
delay (5.0 # Milliseconds # fromDuration)
res <- Ref.read ref # liftEffect
res `shouldEqual` true
it "kills slow jobs" do
q <- mkQueue (params { maxSize = 1, timeout = (3.0 # Milliseconds) }) pool
ref <- Ref.new false # liftEffect
r <- Q.enqueue (writeRefJob ref true (4.0 # Milliseconds)) q # liftEffect
r `shouldNotEqual` QueueFull
res <- Ref.read ref # liftEffect
delay (9.0 # Milliseconds)
res `shouldEqual` false
it "runs multiple jobs" do
q <- mkQueue (params { maxSize = 3, timeout = (2.0 # Seconds # fromDuration) }) (ResourcePool [unit])
ref <- Ref.new 0 # liftEffect
let
enq n delay = Q.enqueue (writeRefJob ref n delay) q # liftEffect
r₁ <- enq 1 (0.1 # Seconds)
r₁ `shouldNotEqual` QueueFull
r₂ <- enq 2 (0.2 # Seconds)
r₂ `shouldNotEqual` QueueFull
r₃ <- enq 3 (0.3 # Seconds)
r₃ `shouldNotEqual` QueueFull
Ref.read ref # liftEffect >>= shouldEqual 0 -- ~0ms
delay (0.1 # Seconds # fromDuration)
Ref.read ref # liftEffect >>= shouldEqual 1 -- ~150ms
delay (0.2 # Seconds # fromDuration)
Ref.read ref # liftEffect >>= shouldEqual 2 -- ~250ms
delay (0.3 # Seconds # fromDuration)
Ref.read ref # liftEffect >>= shouldEqual 3 -- ~350ms
it "respects pool size" do
q <- mkQueue (params { maxSize = 3, timeout = (2.0 # Seconds # fromDuration) }) (ResourcePool [unit])
ref <- Ref.new 0 # liftEffect
let
enq n delay = Q.enqueue (writeRefJob ref n delay) q # liftEffect
r₁ <- enq 1 (0.1 # Seconds)
r₁ `shouldNotEqual` QueueFull
r₂ <- enq 2 (0.1 # Seconds)
r₂ `shouldNotEqual` QueueFull
r₃ <- enq 3 (0.1 # Seconds)
r₃ `shouldNotEqual` QueueFull
Ref.read ref # liftEffect >>= shouldEqual 0 -- ~0ms
delay (0.1 # Seconds # fromDuration)
Ref.read ref # liftEffect >>= shouldEqual 1 -- ~150ms
delay (0.2 # Seconds # fromDuration)
Ref.read ref # liftEffect >>= shouldEqual 2 -- ~250ms
delay (0.3 # Seconds # fromDuration)
Ref.read ref # liftEffect >>= shouldEqual 3 -- ~350ms
it "respects pool size (2)" do
q <- mkQueue (params { maxSize = 3, timeout = (2.0 # Seconds # fromDuration) }) (ResourcePool [unit, unit, unit])
ref <- Ref.new 0 # liftEffect
let
enq n delay = Q.enqueue (writeRefJob ref n delay) q # liftEffect
r₁ <- enq 1 (0.1 # Seconds)
r₂ <- enq 2 (0.1 # Seconds)
r₃ <- enq 3 (0.1 # Seconds)
r₁ `shouldNotEqual` QueueFull
r₂ `shouldNotEqual` QueueFull
r₃ `shouldNotEqual` QueueFull
Ref.read ref # liftEffect >>= shouldEqual 0 -- ~0ms
delay (0.15 # Seconds # fromDuration)
Ref.read ref # liftEffect >>= shouldEqual 3 -- ~150ms
it "can kill processes" do
q <- mkQueue (params { maxSize = 1, timeout = (1.0 # Seconds # fromDuration) }) (ResourcePool [unit])
resultRef <- Ref.new false # liftEffect
let
job =
NewJob \_ -> do
res <- execCommand (Folder ".") "sleep 2"
Ref.write true resultRef # liftEffect
r₁ <- Q.enqueue job q # liftEffect
r₁ `shouldNotEqual` QueueFull
delay (1.3 # Seconds # fromDuration)
result <- Ref.read resultRef # liftEffect
result `shouldEqual` false

View File

@ -1,120 +1,178 @@
module Main where
import Prelude
import Data.Array (unsafeIndex)
import Data.Array (fromFoldable, head, (..))
import Data.Either (Either(..))
import Data.Int (fromString)
import Data.Maybe (Maybe(..), fromMaybe)
import Data.Newtype (class Newtype, un)
import Data.Posix.Signal (Signal(..))
import Data.String.Utils (lines)
import Data.Time.Duration (Seconds(..), fromDuration)
import Effect (Effect)
import Effect.Aff (Aff, effectCanceler, launchAff_, makeAff)
import Effect.AVar as AVar
import Effect.Aff (Aff, Milliseconds(..), bracket, delay, effectCanceler, launchAff_, makeAff)
import Effect.Aff.Class (liftAff)
import Effect.Class (class MonadEffect, liftEffect)
import Effect.Class.Console (info)
import Effect.Class.Console (info, log)
import Effect.Exception (throw)
import Foreign (unsafeToForeign)
import JobQueue (EnqueueResult(..), NewJob(..), Queue, ResourcePool(..))
import JobQueue as Q
import Middleware.JsonBodyParser (jsonBodyParser)
import Node.Buffer (Buffer)
import Node.Buffer as Buffer
import Node.ChildProcess (ExecResult, defaultExecOptions, exec, kill)
import Node.ChildProcess (ExecResult, defaultExecOptions, exec, kill, pid)
import Node.Encoding (Encoding(..))
import Node.Express.App (App, listenHttp, listenHttps, use, useExternal)
import Node.Express.App as E
import Node.Express.Handler (HandlerM)
import Node.Express.Handler (HandlerM(..), Handler)
import Node.Express.Middleware.Static (static)
import Node.Express.Request (getBody')
import Node.Express.Response (setStatus)
import Node.Express.Response as Response
import Node.Express.Types (Request, Response)
import Node.FS.Aff (readTextFile, writeTextFile)
import Node.HTTP (Server)
import Node.OS (numCpus)
import Node.Process (lookupEnv)
import Partial.Unsafe (unsafePartial)
import Shared.Json (readAff)
import Shared.Models.Body (RunResult)
import Shared.Models.Body (RunResult, PursResult)
import Shared.Models.Body as Body
import Simple.JSON (read_, write)
import Simple.JSON as JSON
toBody ∷ ∀ r m. MonadEffect m => { stdout ∷ Buffer, stderr ∷ Buffer | r } -> m RunResult
toBody result = liftEffect $ ado
stdout <- Buffer.toString UTF8 result.stdout
stderr <- Buffer.toString UTF8 result.stderr
let (code ∷ Maybe Int) = asErrorWithCode result >>= _.code
in { code, stdout, stderr } ∷ RunResult
toBody result =
liftEffect
$ ado
stdout <- Buffer.toString UTF8 result.stdout
stderr <- Buffer.toString UTF8 result.stderr
let
(code ∷ Maybe Int) = asErrorWithCode result >>= _.code
in { code, stdout, stderr } ∷ RunResult
type ErrorWithCode = { code ∷ Maybe Int }
type ErrorWithCode
= { code ∷ Maybe Int }
asErrorWithCode ∷ ∀ a. a -> Maybe ErrorWithCode
asErrorWithCode = read_ <<< unsafeToForeign
compileCode ∷ String -> Aff String
compileCode code = do
saveMainFile code
{ stderr } <- execCommand "spago build -- --json-errors"
errorString <- liftEffect $ Buffer.toString UTF8 stderr
pure $ unsafePartial (unsafeIndex (lines errorString) 0)
compileCode ∷ Folder -> String -> Aff PursResult
compileCode folder code = do
saveMainFile folder code
{ stderr } <- execCommand folder "spago build --purs-args \"--json-errors\""
strResult <- liftEffect $ Buffer.toString UTF8 stderr
case checkOutput strResult of
Just r -> pure r
Nothing -> liftEffect $ throw $ "No result in " <> strResult
where
checkOutput output =
head do
line <- lines output
let
(parseable :: Maybe PursResult) = JSON.readJSON_ line
fromFoldable parseable
runCode ∷ Aff ExecResult
runCode = execCommand "node run.js"
runCode ∷ Folder -> Aff ExecResult
runCode folder = execCommand folder "node run.js"
playground ∷ String
playground = "../playground"
saveMainFile ∷ Folder -> String -> Aff Unit
saveMainFile folder code = writeTextFile UTF8 (un Folder folder <> "/src/Main.purs") code
saveMainFile ∷ String -> Aff Unit
saveMainFile code =
writeTextFile UTF8 (playground <> "/src/Main.purs") code
execCommand ∷ String -> Aff ExecResult
execCommand command =
execCommand ∷ Folder -> String -> Aff ExecResult
execCommand folder command =
makeAff \callback -> do
let options = defaultExecOptions { cwd = Just playground }
let
options = defaultExecOptions { cwd = Just (un Folder folder) }
childProcess <- exec command options (callback <<< Right)
pure $ effectCanceler (kill SIGKILL childProcess)
pure $ effectCanceler ((log $ "Killing " <> show (pid childProcess)) *> kill SIGKILL childProcess)
compileHandler ∷ HandlerM Unit
compileHandler = do
newtype Folder
= Folder String
derive instance ntFolder :: Newtype Folder _
derive newtype instance eqFolder :: Eq Folder
compileHandler ∷ Queue Folder -> HandlerM Unit
compileHandler queue = do
body <- getBody'
json <- readAff body # liftAff
result <- compileCode (json ∷ Body.CompileRequest).code # liftAff
Response.send $ write ({ result } ∷ Body.CompileResult)
resultVar <- AVar.empty # liftEffect
let
cleanup ∷ Aff Unit
cleanup = AVar.tryPut Nothing resultVar # liftEffect # void
runHandler ∷ HandlerM Unit
runHandler = do
result <- liftAff do
result <- runCode
toBody result
Response.send $ write (result ∷ Body.RunResult)
compileJob =
NewJob \folder ->
bracket (pure unit) (const cleanup) \_ -> do
result <- compileCode folder (json ∷ Body.CompileRequest).code
AVar.tryPut (Just result) resultVar # liftEffect # void
queueRes <- Q.enqueue compileJob queue # liftEffect
let
getResult ∷ Handler
getResult = do
maybeResult <- AVar.tryRead resultVar # liftEffect
case maybeResult of
Just (Just result) -> Response.send $ write ({ result } ∷ Body.CompileResult)
Just Nothing -> do
setStatus 500
Response.send $ write { error: "Timeout" }
Nothing -> do
delay (100.0 # Milliseconds) # liftAff
getResult
case queueRes of
Enqueued _ -> getResult
QueueFull -> do
setStatus 500
Response.send $ write { error: "Queue full" }
unHandler ∷ ∀ t6. Request -> Response -> Effect Unit -> HandlerM t6 -> Aff t6
unHandler req res next (HandlerM h) = h req res next
runHandler ∷ Queue Folder -> HandlerM Unit
runHandler queue =
HandlerM \req res next -> do
let
handle = unHandler req res next
runJob =
NewJob \folder ->
bracket (pure unit) (const (pure unit)) \_ -> do
result <- runCode folder
resultBody <- toBody result
handle $ Response.send $ write (resultBody ∷ Body.RunResult)
queueRes <- Q.enqueue runJob queue # liftEffect
case queueRes of
Enqueued _ -> pure unit
QueueFull ->
handle do
setStatus 500
Response.send $ write { error: "Queue full" }
main ∷ Effect Unit
main = launchAff_ do
serverSetup makeApp
main =
launchAff_ do
cpus <- numCpus # liftEffect
let
foldersToUse = min 2 (max 15 (cpus - 1))
makeApp ∷ App
makeApp = do
mkFolder = Folder <<< ("../playground" <> _) <<< show
folderPool = ResourcePool $ mkFolder <$> (0 .. foldersToUse)
q <-
Q.mkQueue
{ maxSize: 50
, timeout: 60.0 # Seconds # fromDuration
}
folderPool
serverSetup (makeApp q)
makeApp ∷ Queue Folder -> App
makeApp q = do
use $ static "assets"
useExternal jsonBodyParser
E.post "/compile" compileHandler
E.post "/run" runHandler
-- useOnError (errorHandler log)
-- { method: HTTPure.Post, path: ["_compile"], body } -> do
-- result <- compileCode body
-- HTTPure.ok' (HTTPure.header "Content-Type" "application/json") result
-- { method: HTTPure.Post, path: ["_run"] } -> do
-- result <- runCode
-- response <- toBody result
-- okJson response
-- { method: HTTPure.Get, path } | elem path [[], [""], ["/"]] -> do
-- content <- readTextFile UTF8 "index.html"
-- HTTPure.ok' (HTTPure.header "Content-Type" "text/html") content
-- { method: HTTPure.Get, path } -> do
-- let filePath = intercalate "/" path
-- let mimeType = guessMimeType filePath
-- content <- readTextFile UTF8 filePath
-- HTTPure.ok' (HTTPure.header "Content-Type" mimeType) content
-- _ -> HTTPure.notFound
E.post "/compile" (compileHandler q)
E.post "/run" (runHandler q)
data CertificatesToUse
= UseLocalCertificates
@ -128,14 +186,14 @@ parseCertificatesToUse = case _ of
serverSetup ∷ App -> Aff Server
serverSetup app = do
maybePortString <- lookupEnv "PORT" # liftEffect
let port = maybePortString >>= fromString # fromMaybe 14188
let
port = maybePortString >>= fromString # fromMaybe 14188
useLocalCertificates <- fromMaybe DoNotUseLocalCertificates <<< map parseCertificatesToUse <$> lookupEnv "CERTS" # liftEffect
listen <- case useLocalCertificates of
UseLocalCertificates -> do
httpsOptions <- makeHttpsOptions
pure $ listenHttps app (port ∷ Int) httpsOptions
DoNotUseLocalCertificates ->
pure $ listenHttp app port
DoNotUseLocalCertificates -> pure $ listenHttp app port
liftEffect $ listen \_ -> info $ "psfp server started on port " <> show port
where
makeHttpsOptions = do

2
server/src/Node/OS.js Normal file
View File

@ -0,0 +1,2 @@
var os = require('os')
exports.numCpus = function() { return os.cpus().length }

5
server/src/Node/OS.purs Normal file
View File

@ -0,0 +1,5 @@
module Node.OS where
import Effect (Effect)
foreign import numCpus :: Effect Int

View File

@ -3,9 +3,10 @@ module Test.Main where
import Prelude
import Effect (Effect)
import Effect.Class.Console (log)
import Effect.Aff (launchAff_)
import Test.Spec.Discovery (discover)
import Test.Spec.Reporter (consoleReporter)
import Test.Spec.Runner (runSpec)
main :: Effect Unit
main = do
log "🍝"
log "You should add some tests."
main = launchAff_ $ discover ".*Spec$" >>= runSpec [ consoleReporter ]

View File

@ -2,8 +2,43 @@ module Shared.Models.Body where
import Data.Maybe (Maybe)
type CompileRequest = { code ∷ String }
type CompileRequest
= { code ∷ String }
type CompileResult = { result ∷ String }
type CompileResult
= { result ∷ PursResult }
type RunResult = { code ∷ Maybe Int, stdout ∷ String, stderr ∷ String }
type RunResult
= { code ∷ Maybe Int, stdout ∷ String, stderr ∷ String }
type Suggestion
= { replaceRange :: Position , replacement :: String }
type Span
= { end :: Array Int
, name :: String
, start :: Array Int
}
type Position
= { endColumn :: Int
, endLine :: Int
, startColumn :: Int
, startLine :: Int
}
type ErrorOrWarning
= { allSpans :: Array Span
, errorCode :: String
, errorLink :: String
, filename :: String
, message :: String
, moduleName :: Maybe String
, position :: Position
, suggestion :: Maybe Suggestion
}
type PursResult
= { errors :: Array ErrorOrWarning
, warnings :: Array ErrorOrWarning
}