mirror of
https://github.com/jfischoff/hasql-queue.git
synced 2024-10-05 19:58:32 +03:00
first commit
This commit is contained in:
commit
4d83ac2382
58
.gitignore
vendored
Normal file
58
.gitignore
vendored
Normal file
@ -0,0 +1,58 @@
|
||||
# Compiled source #
|
||||
###################
|
||||
*.com
|
||||
*.class
|
||||
*.dll
|
||||
*.exe
|
||||
*.o
|
||||
*.so
|
||||
*.hi
|
||||
*.lib
|
||||
|
||||
*.aes
|
||||
*.pem
|
||||
# Packages #
|
||||
############
|
||||
# it's better to unpack these files and commit the raw source
|
||||
# git has its own built in compression methods
|
||||
*.7z
|
||||
*.dmg
|
||||
*.gz
|
||||
*.iso
|
||||
*.jar
|
||||
*.rar
|
||||
*.tar
|
||||
*.zip
|
||||
|
||||
# Logs and databases #
|
||||
######################
|
||||
*.log
|
||||
*.sql
|
||||
*.sqlite
|
||||
|
||||
#keys
|
||||
*.pem
|
||||
*.cer
|
||||
*.pem~
|
||||
*.p12
|
||||
|
||||
# OS generated files #
|
||||
######################
|
||||
.DS_Store*
|
||||
ehthumbs.db
|
||||
Icon?
|
||||
Thumbs.db
|
||||
|
||||
*.p_o
|
||||
*.hi
|
||||
*.*~
|
||||
dist
|
||||
tags
|
||||
*.prof
|
||||
|
||||
.cabal-sandbox
|
||||
.stack-work
|
||||
.vagrant
|
||||
bin/
|
||||
core/
|
||||
*.s
|
30
LICENSE
Normal file
30
LICENSE
Normal file
@ -0,0 +1,30 @@
|
||||
Copyright Author name here (c) 2017
|
||||
|
||||
All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions are met:
|
||||
|
||||
* Redistributions of source code must retain the above copyright
|
||||
notice, this list of conditions and the following disclaimer.
|
||||
|
||||
* Redistributions in binary form must reproduce the above
|
||||
copyright notice, this list of conditions and the following
|
||||
disclaimer in the documentation and/or other materials provided
|
||||
with the distribution.
|
||||
|
||||
* Neither the name of Author name here nor the names of other
|
||||
contributors may be used to endorse or promote products derived
|
||||
from this software without specific prior written permission.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
56
db-testing-example.cabal
Normal file
56
db-testing-example.cabal
Normal file
@ -0,0 +1,56 @@
|
||||
name: db-testing-example
|
||||
version: 0.1.0.0
|
||||
-- synopsis:
|
||||
-- description:
|
||||
homepage: https://github.com/githubuser/db-testing-example#readme
|
||||
license: BSD3
|
||||
license-file: LICENSE
|
||||
author: Author name here
|
||||
maintainer: example@example.com
|
||||
copyright: 2017 Author name here
|
||||
category: Web
|
||||
build-type: Simple
|
||||
extra-source-files: README.md
|
||||
cabal-version: >=1.10
|
||||
|
||||
library
|
||||
hs-source-dirs: src
|
||||
exposed-modules: Database.Queue
|
||||
, Database.Queue.Migrate
|
||||
, Database.Queue.Main
|
||||
build-depends: base >= 4.7 && < 5
|
||||
, postgresql-simple
|
||||
, pg-transact
|
||||
, aeson
|
||||
, time
|
||||
, uuid
|
||||
, transformers
|
||||
, random
|
||||
, text
|
||||
default-language: Haskell2010
|
||||
|
||||
test-suite db-testing-example-test
|
||||
type: exitcode-stdio-1.0
|
||||
hs-source-dirs: test
|
||||
main-is: Spec.hs
|
||||
other-modules: Database.QueueSpec
|
||||
, Test.Setup
|
||||
build-depends: base
|
||||
, db-testing-example
|
||||
, hspec
|
||||
, hspec-discover
|
||||
, tmp-postgres
|
||||
, postgresql-simple
|
||||
, pg-transact
|
||||
, bytestring
|
||||
, aeson
|
||||
, hspec-expectations-lifted
|
||||
, text
|
||||
, resource-pool
|
||||
, async
|
||||
ghc-options: -threaded -rtsopts -with-rtsopts=-N
|
||||
default-language: Haskell2010
|
||||
|
||||
source-repository head
|
||||
type: git
|
||||
location: https://github.com/githubuser/db-testing-example
|
140
src/Database/Queue.hs
Normal file
140
src/Database/Queue.hs
Normal file
@ -0,0 +1,140 @@
|
||||
{-# LANGUAGE OverloadedStrings, GeneralizedNewtypeDeriving, ScopedTypeVariables, LambdaCase #-}
|
||||
{-# LANGUAGE QuasiQuotes, RecordWildCards #-}
|
||||
module Database.Queue where
|
||||
import Database.PostgreSQL.Simple (Connection, Only (..))
|
||||
import qualified Database.PostgreSQL.Simple as Simple
|
||||
import Database.PostgreSQL.Transact
|
||||
import Data.Aeson
|
||||
import Data.Time
|
||||
import Database.PostgreSQL.Simple.Transaction
|
||||
import Database.PostgreSQL.Simple.FromRow
|
||||
import Database.PostgreSQL.Simple.ToRow
|
||||
import Database.PostgreSQL.Simple.FromField
|
||||
import Database.PostgreSQL.Simple.ToField
|
||||
import Data.UUID
|
||||
import Data.Foldable
|
||||
import Control.Monad
|
||||
import Data.Maybe
|
||||
import Control.Monad.IO.Class
|
||||
import System.Random
|
||||
import Data.Text (Text)
|
||||
import qualified Data.Text as T
|
||||
import Database.PostgreSQL.Simple.SqlQQ
|
||||
import Database.PostgreSQL.Simple.Notification
|
||||
import Data.Function
|
||||
import Data.Int
|
||||
|
||||
newtype PayloadId = PayloadId { unPayloadId :: UUID }
|
||||
deriving (Eq, Show, FromField, ToField)
|
||||
|
||||
instance FromRow PayloadId where
|
||||
fromRow = fromOnly <$> fromRow
|
||||
|
||||
instance ToRow PayloadId where
|
||||
toRow = toRow . Only
|
||||
|
||||
data State = Enqueued | Locked | Dequeued
|
||||
deriving (Show, Eq, Ord, Enum, Bounded)
|
||||
|
||||
instance ToField State where
|
||||
toField = toField . \case
|
||||
Enqueued -> "enqueued" :: Text
|
||||
Locked -> "locked"
|
||||
Dequeued -> "dequeued"
|
||||
|
||||
instance FromField State where
|
||||
fromField f y = do
|
||||
name <- typename f
|
||||
if name == "state" then case y of
|
||||
Nothing -> returnError UnexpectedNull f "status can't be NULL"
|
||||
Just y' -> case y' of
|
||||
"enqueued" -> return Enqueued
|
||||
"locked" -> return Locked
|
||||
"dequeued" -> return Dequeued
|
||||
x -> returnError ConversionFailed f (show x)
|
||||
else
|
||||
returnError Incompatible f $
|
||||
"Expect type name to be status but it was " ++ show name
|
||||
|
||||
data Payload = Payload
|
||||
{ pId :: PayloadId
|
||||
, pValue :: Value
|
||||
, pCreated :: UTCTime
|
||||
, pState :: State
|
||||
} deriving (Show, Eq)
|
||||
|
||||
instance FromRow Payload where
|
||||
fromRow = Payload <$> field <*> field <*> field <*> field
|
||||
|
||||
enqueue :: Value -> DB PayloadId
|
||||
enqueue value = do
|
||||
pid <- liftIO randomIO
|
||||
execute [sql| INSERT INTO payloads (id, value)
|
||||
VALUES (?, ?);
|
||||
NOTIFY enqueue;
|
||||
|]
|
||||
(pid, value)
|
||||
return $ PayloadId pid
|
||||
|
||||
unlock = undefined -- use for testing lock and unlock until everything is done
|
||||
|
||||
tryLockDB :: DB (Maybe Payload)
|
||||
tryLockDB = do
|
||||
next <- listToMaybe <$> query_
|
||||
[sql| SELECT id, value, created, status
|
||||
FROM payloads
|
||||
WHERE status='enqueued'
|
||||
LIMIT 1
|
||||
|]
|
||||
for_ next $ \p -> void $ execute
|
||||
[sql| UPDATE payloads
|
||||
SET status='locked'
|
||||
WHERE id=?
|
||||
|] $
|
||||
pId p
|
||||
return next
|
||||
|
||||
-- This should also return the count
|
||||
tryLock :: Connection -> IO (Maybe Payload)
|
||||
tryLock = runDBTSerializable tryLockDB
|
||||
|
||||
notifyPayload :: Connection -> IO ()
|
||||
notifyPayload conn = do
|
||||
Notification {..} <- getNotification conn
|
||||
if notificationChannel == "enqueue" then
|
||||
return ()
|
||||
else
|
||||
notifyPayload conn
|
||||
|
||||
lock :: Connection -> IO Payload
|
||||
lock conn = do
|
||||
Simple.execute_ conn "LISTEN enqueue"
|
||||
fix $ \next -> do
|
||||
m <- tryLock conn
|
||||
case m of
|
||||
Nothing -> do
|
||||
notifyPayload conn
|
||||
next
|
||||
Just x -> do
|
||||
Simple.execute_ conn "UNLISTEN enqueue"
|
||||
return x
|
||||
|
||||
dequeue :: PayloadId -> DB ()
|
||||
dequeue queueId
|
||||
= void
|
||||
$ execute
|
||||
[sql| UPDATE payloads
|
||||
SET status='dequeued'
|
||||
WHERE id=?
|
||||
|]
|
||||
queueId
|
||||
|
||||
getCountDB :: DB Int64
|
||||
getCountDB = fromOnly . head <$> query_ [sql|
|
||||
SELECT count(*)
|
||||
FROM payloads
|
||||
WHERE status='enqueued'
|
||||
|]
|
||||
|
||||
getCount :: Connection -> IO Int64
|
||||
getCount = runDBTSerializable getCountDB
|
6
src/Database/Queue/Main.hs
Normal file
6
src/Database/Queue/Main.hs
Normal file
@ -0,0 +1,6 @@
|
||||
module Database.Queue.Main where
|
||||
import Database.Queue
|
||||
-- Make a defaultMain that takes in a processing functionx
|
||||
|
||||
defaultMain :: (Payload -> Int -> IO ()) -> IO ()
|
||||
defaultMain = undefined
|
20
src/Database/Queue/Migrate.hs
Normal file
20
src/Database/Queue/Migrate.hs
Normal file
@ -0,0 +1,20 @@
|
||||
{-# LANGUAGE QuasiQuotes #-}
|
||||
module Database.Queue.Migrate where
|
||||
import Database.PostgreSQL.Simple
|
||||
import Database.PostgreSQL.Simple.SqlQQ
|
||||
import Control.Monad
|
||||
|
||||
migrate :: Connection -> IO ()
|
||||
migrate conn = void $ execute_ conn [sql|
|
||||
CREATE TYPE state AS ENUM ('enqueued', 'locked', 'dequeued');
|
||||
|
||||
CREATE TABLE payloads
|
||||
( id uuid PRIMARY KEY
|
||||
, value jsonb NOT NULL
|
||||
, status state NOT NULL DEFAULT 'enqueued'
|
||||
, created TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT clock_timestamp()
|
||||
);
|
||||
|
||||
CREATE INDEX status_idx ON payloads (status);
|
||||
|]
|
||||
|
78
stack.yaml
Normal file
78
stack.yaml
Normal file
@ -0,0 +1,78 @@
|
||||
# This file was automatically generated by 'stack init'
|
||||
#
|
||||
# Some commonly used options have been documented as comments in this file.
|
||||
# For advanced use and comprehensive documentation of the format, please see:
|
||||
# http://docs.haskellstack.org/en/stable/yaml_configuration/
|
||||
|
||||
# Resolver to choose a 'specific' stackage snapshot or a compiler version.
|
||||
# A snapshot resolver dictates the compiler version and the set of packages
|
||||
# to be used for project dependencies. For example:
|
||||
#
|
||||
# resolver: lts-3.5
|
||||
# resolver: nightly-2015-09-21
|
||||
# resolver: ghc-7.10.2
|
||||
# resolver: ghcjs-0.1.0_ghc-7.10.2
|
||||
# resolver:
|
||||
# name: custom-snapshot
|
||||
# location: "./custom-snapshot.yaml"
|
||||
resolver: lts-8.18
|
||||
|
||||
# User packages to be built.
|
||||
# Various formats can be used as shown in the example below.
|
||||
#
|
||||
# packages:
|
||||
# - some-directory
|
||||
# - https://example.com/foo/bar/baz-0.0.2.tar.gz
|
||||
# - location:
|
||||
# git: https://github.com/commercialhaskell/stack.git
|
||||
# commit: e7b331f14bcffb8367cd58fbfc8b40ec7642100a
|
||||
# - location: https://github.com/commercialhaskell/stack/commit/e7b331f14bcffb8367cd58fbfc8b40ec7642100a
|
||||
# extra-dep: true
|
||||
# subdirs:
|
||||
# - auto-update
|
||||
# - wai
|
||||
#
|
||||
# A package marked 'extra-dep: true' will only be built if demanded by a
|
||||
# non-dependency (i.e. a user package), and its test suites and benchmarks
|
||||
# will not be run. This is useful for tweaking upstream packages.
|
||||
packages:
|
||||
- '.'
|
||||
- location:
|
||||
git: https://github.com/jfischoff/tmp-postgres
|
||||
commit: 35d0c6d4f64184e75b1d3b21dc2646d76e9d793c
|
||||
extra-dep: true
|
||||
- location:
|
||||
git: https://github.com/jfischoff/pg-transact
|
||||
commit: 86e8bb470be5a417dda44896a29cd1764f0cce69
|
||||
extra-dep: true
|
||||
|
||||
# Dependency packages to be pulled from upstream that are not in the resolver
|
||||
# (e.g., acme-missiles-0.3)
|
||||
extra-deps: [ "tmp-postgres-0.1.0.0"
|
||||
, "pg-transact-0.1.0.0"
|
||||
, "hspec-expectations-lifted-0.10.0"
|
||||
]
|
||||
|
||||
# Override default flag values for local packages and extra-deps
|
||||
flags: {}
|
||||
|
||||
# Extra package databases containing global packages
|
||||
extra-package-dbs: []
|
||||
|
||||
# Control whether we use the GHC we find on the path
|
||||
# system-ghc: true
|
||||
#
|
||||
# Require a specific version of stack, using version ranges
|
||||
# require-stack-version: -any # Default
|
||||
# require-stack-version: ">=1.4"
|
||||
#
|
||||
# Override the architecture used by stack, especially useful on Windows
|
||||
# arch: i386
|
||||
# arch: x86_64
|
||||
#
|
||||
# Extra directories used by stack for building
|
||||
# extra-include-dirs: [/path/to/dir]
|
||||
# extra-lib-dirs: [/path/to/dir]
|
||||
#
|
||||
# Allow a newer minor version of GHC than the snapshot specifies
|
||||
# compiler-check: newer-minor
|
75
test/Database/QueueSpec.hs
Normal file
75
test/Database/QueueSpec.hs
Normal file
@ -0,0 +1,75 @@
|
||||
{-# LANGUAGE RecordWildCards, OverloadedStrings #-}
|
||||
module Database.QueueSpec (spec, main) where
|
||||
import Database.Queue
|
||||
import Test.Hspec (Spec, hspec, it)
|
||||
import Test.Hspec.Expectations.Lifted
|
||||
import Test.Setup
|
||||
import Data.Aeson
|
||||
import Control.Concurrent
|
||||
import Data.IORef
|
||||
import Control.Monad
|
||||
import System.Timeout
|
||||
import Data.Function
|
||||
import Data.List
|
||||
import Data.Maybe
|
||||
import Data.Pool
|
||||
import Control.Concurrent.Async
|
||||
|
||||
main :: IO ()
|
||||
main = hspec spec
|
||||
|
||||
spec :: Spec
|
||||
spec = describeDB "Database.Queue" $ do
|
||||
itDB "empty locks nothing" $ do
|
||||
tryLockDB `shouldReturn` Nothing
|
||||
|
||||
itDB "enqueues/locks/dequeues" $ do
|
||||
payloadId <- enqueue $ String "Hello"
|
||||
Just Payload {..} <- tryLockDB
|
||||
|
||||
pId `shouldBe` payloadId
|
||||
pValue `shouldBe` String "Hello"
|
||||
tryLockDB `shouldReturn` Nothing
|
||||
|
||||
dequeue pId `shouldReturn` ()
|
||||
tryLockDB `shouldReturn` Nothing
|
||||
|
||||
it "enqueues and dequeues concurrently tryLock" $ \testDB -> do
|
||||
ref <- newIORef []
|
||||
|
||||
loopThreads <- replicateM 10 $ async $ fix $ \next -> do
|
||||
mpayload <- runDB testDB tryLockDB
|
||||
case mpayload of
|
||||
Nothing -> next
|
||||
Just x -> do
|
||||
lastCount <- atomicModifyIORef ref $ \xs -> (pValue x : xs, length xs + 1)
|
||||
runDB testDB $ dequeue $ pId x
|
||||
when (lastCount < 1001) next
|
||||
|
||||
-- Fork a hundred threads and enqueue an index
|
||||
forM_ [0 .. 1000 :: Int] $ \i -> forkIO $ void $ runDB testDB $ enqueue $ toJSON i
|
||||
|
||||
let expected = [0 .. 1000 :: Int]
|
||||
|
||||
waitAnyCancel loopThreads
|
||||
Just decoded <- mapM (decode . encode) <$> readIORef ref
|
||||
sort decoded `shouldBe` sort expected
|
||||
|
||||
|
||||
it "enqueues and dequeues concurrently lock" $ \testDB -> do
|
||||
ref <- newIORef []
|
||||
|
||||
loopThreads <- replicateM 10 $ async $ fix $ \next -> do
|
||||
x <- withConnection testDB lock
|
||||
lastCount <- atomicModifyIORef ref $ \xs -> (pValue x : xs, length xs + 1)
|
||||
runDB testDB $ dequeue $ pId x
|
||||
when (lastCount < 1001) next
|
||||
|
||||
-- Fork a hundred threads and enqueue an index
|
||||
forM_ [0 .. 1000 :: Int] $ \i -> forkIO $ void $ runDB testDB $ enqueue $ toJSON i
|
||||
|
||||
let expected = [0 .. 1000 :: Int]
|
||||
|
||||
waitAnyCancel loopThreads
|
||||
Just decoded <- mapM (decode . encode) <$> readIORef ref
|
||||
sort decoded `shouldBe` sort expected
|
1
test/Spec.hs
Normal file
1
test/Spec.hs
Normal file
@ -0,0 +1 @@
|
||||
{-# OPTIONS_GHC -F -pgmF hspec-discover #-}
|
53
test/Test/Setup.hs
Normal file
53
test/Test/Setup.hs
Normal file
@ -0,0 +1,53 @@
|
||||
{-# LANGUAGE RecordWildCards #-}
|
||||
module Test.Setup where
|
||||
import Test.Hspec
|
||||
import Database.Queue.Migrate
|
||||
import Database.Queue
|
||||
import qualified Database.Postgres.Temp as Temp
|
||||
import Database.PostgreSQL.Simple
|
||||
import Database.PostgreSQL.Transact
|
||||
import Database.PostgreSQL.Simple.Transaction
|
||||
import qualified Data.ByteString.Char8 as BSC
|
||||
import Control.Exception
|
||||
import Control.Monad
|
||||
import Data.Pool
|
||||
|
||||
data TestDB = TestDB
|
||||
{ tempDB :: Temp.DB
|
||||
, connection :: Pool Connection
|
||||
}
|
||||
|
||||
setupDB :: IO TestDB
|
||||
setupDB = do
|
||||
tempDB <- either throwIO return =<< Temp.startAndLogToTmp
|
||||
-- tempDB <- either throwIO return =<< Temp.start
|
||||
putStrLn $ Temp.connectionString tempDB
|
||||
connection <- createPool
|
||||
(connectPostgreSQL $ BSC.pack $ Temp.connectionString tempDB)
|
||||
close
|
||||
1
|
||||
100000000
|
||||
50
|
||||
withResource connection migrate
|
||||
return TestDB {..}
|
||||
|
||||
teardownDB :: TestDB -> IO ()
|
||||
teardownDB TestDB {..} = do
|
||||
destroyAllResources connection
|
||||
void $ Temp.stop tempDB
|
||||
|
||||
withConnection :: TestDB -> (Connection -> IO a) -> IO a
|
||||
withConnection testDB = withResource (connection testDB)
|
||||
|
||||
withDB :: DB a -> TestDB -> IO a
|
||||
withDB action testDB = withResource (connection testDB) (runDBTSerializable action)
|
||||
|
||||
runDB :: TestDB -> DB a -> IO a
|
||||
runDB = flip withDB
|
||||
|
||||
itDB :: String -> DB a -> SpecWith TestDB
|
||||
itDB msg action = it msg $ void . withDB action
|
||||
|
||||
describeDB :: String -> SpecWith TestDB -> Spec
|
||||
describeDB str = beforeAll setupDB . afterAll teardownDB . describe str
|
||||
|
Loading…
Reference in New Issue
Block a user