mirror of
https://github.com/postgres-haskell/postgres-wire.git
synced 2024-11-22 05:53:12 +03:00
benchmarks as cli command
This commit is contained in:
parent
defecb8103
commit
4d123e9d2a
246
bench/Bench.hs
246
bench/Bench.hs
@ -1,11 +1,11 @@
|
||||
{-# language BangPatterns #-}
|
||||
{-# language LambdaCase #-}
|
||||
module Main where
|
||||
|
||||
import Data.ByteString.Lazy (toStrict)
|
||||
import qualified Data.ByteString.Lazy as BL
|
||||
import qualified Data.ByteString as B
|
||||
import Data.ByteString.Builder (toLazyByteString)
|
||||
import Data.ByteString (ByteString)
|
||||
import Data.Vector as V(fromList, empty)
|
||||
import Data.IORef
|
||||
import Data.Int
|
||||
@ -15,22 +15,15 @@ import Control.Concurrent
|
||||
import Control.Applicative
|
||||
import Control.Monad
|
||||
import Data.Monoid
|
||||
import Control.DeepSeq
|
||||
import System.IO.Unsafe
|
||||
import System.Clock
|
||||
import Options.Applicative
|
||||
|
||||
import qualified Database.PostgreSQL.LibPQ as LibPQ
|
||||
|
||||
import Database.PostgreSQL.Protocol.Types
|
||||
import Database.PostgreSQL.Protocol.Encoders
|
||||
import Database.PostgreSQL.Protocol.Decoders
|
||||
import Database.PostgreSQL.Protocol.DataRows
|
||||
import Database.PostgreSQL.Protocol.Store.Decode
|
||||
import Database.PostgreSQL.Protocol.Codecs.Decoders
|
||||
import Database.PostgreSQL.Driver.Connection
|
||||
import Database.PostgreSQL.Driver
|
||||
import Criterion.Main
|
||||
|
||||
--
|
||||
-- CREATE TABLE _bytes_100_of_1k(b bytea);
|
||||
-- CREATE TABLE _bytes_400_of_200(b bytea);
|
||||
-- CREATE TABLE _bytes_10_of_20k(b bytea);
|
||||
@ -39,69 +32,107 @@ import Criterion.Main
|
||||
|
||||
-- INSERT INTO _bytes_100_of_1k(b)
|
||||
-- (SELECT repeat('a', 1000)::bytea FROM generate_series(1, 100));
|
||||
--
|
||||
-- INSERT INTO _bytes_400_of_200(b)
|
||||
-- (SELECT repeat('a', 200)::bytea FROM generate_series(1, 400));
|
||||
--
|
||||
-- INSERT INTO _bytes_10_of_20k(b)
|
||||
-- (SELECT repeat('a', 20000)::bytea FROM generate_series(1, 10));
|
||||
--
|
||||
-- INSERT INTO _bytes_1_of_200(b) VALUES(repeat('a', 200)::bytea);
|
||||
--
|
||||
-- INSERT INTO _bytes_300_of_100(b)
|
||||
-- (SELECT repeat('a', 100)::bytea FROM generate_series(1, 300));
|
||||
|
||||
main = defaultMain
|
||||
-- [ bgroup "Requests"
|
||||
-- [
|
||||
-- -- env createConnection (\c -> bench "100 of 1k" . nfIO $ requestAction c)
|
||||
-- bench "parser" $ nf parse bs
|
||||
-- ]
|
||||
-- ]
|
||||
data Action
|
||||
= BenchPW RowsType
|
||||
| BenchLibPQ RowsType
|
||||
| BenchLoop
|
||||
deriving (Show, Eq)
|
||||
|
||||
-- benchDataRowDecoder d bs = decodeManyRows d $
|
||||
-- DataRows (DataChunk 380 bs) Empty
|
||||
-- where
|
||||
-- decodeDataRow = do
|
||||
-- (Header _ len) <- decodeHeader
|
||||
-- getByteString len
|
||||
data RowsType
|
||||
= Bytes100_1k
|
||||
| Bytes400_200
|
||||
| Bytes10_20k
|
||||
| Bytes1_200
|
||||
| Bytes300_100
|
||||
deriving (Show, Eq)
|
||||
|
||||
{-# NOINLINE bs #-}
|
||||
bs :: B.ByteString
|
||||
bs = unsafePerformIO $ B.readFile "1.txt"
|
||||
cli :: Parser Action
|
||||
cli = hsubparser $
|
||||
cmd "pw" "benchmark postgres-wire" (BenchPW <$> rowTypeParser)
|
||||
<> cmd "libpq" "benchmark libpq" (BenchLibPQ <$> rowTypeParser)
|
||||
<> cmd "loop" "benchmark datarows decoding loop" (pure BenchLoop)
|
||||
where
|
||||
cmd c h p = command c (info (helper <*> p) $ header h)
|
||||
rowTypeParser = hsubparser $
|
||||
cmd "b100_1k" "100 rows of 1k bytes" (pure Bytes100_1k)
|
||||
<> cmd "b400_200" "400 rows of 200 bytes" (pure Bytes400_200)
|
||||
<> cmd "b10_20k" "10 rows of 20k bytes" (pure Bytes10_20k)
|
||||
<> cmd "b1_200" "1 row of 200 bytes" (pure Bytes1_200)
|
||||
<> cmd "b300_100" "300 rows of 100 bytes" (pure Bytes300_100)
|
||||
|
||||
benchLoop :: IO ()
|
||||
benchLoop = do
|
||||
ref <- newIORef 0 :: IO (IORef Word)
|
||||
rbs <- newIORef "" :: IO (IORef BL.ByteString)
|
||||
!bs <- B.readFile "1.txt"
|
||||
let str = BL.cycle $ BL.fromStrict bs
|
||||
writeIORef rbs str
|
||||
main :: IO ()
|
||||
main = execParser (info (helper <*> cli) $ header "Postgres-wire benchmark")
|
||||
>>= execAction
|
||||
|
||||
let handler dm = case dm of
|
||||
DataMessage _ -> modifyIORef' ref (+1)
|
||||
_ -> pure ()
|
||||
newChunk preBs = do
|
||||
b <- readIORef rbs
|
||||
let (nb, rest) = BL.splitAt 4096 b
|
||||
writeIORef rbs rest
|
||||
-- let res = preBs <> (B.copy $ BL.toStrict nb)
|
||||
let res = preBs <> ( BL.toStrict nb)
|
||||
res `seq` pure res
|
||||
tid <- forkIO $ forever $ loopExtractDataRows newChunk handler
|
||||
threadDelay 1000000
|
||||
killThread tid
|
||||
s <- readIORef ref
|
||||
print $ "Requests: " ++ show s
|
||||
execAction :: Action -> IO ()
|
||||
execAction (BenchPW rows) = benchPw $ queryStatement rows
|
||||
execAction (BenchLibPQ rows) = benchLibpq $ queryStatement rows
|
||||
execAction BenchLoop = benchLoop
|
||||
|
||||
queryStatement :: RowsType -> B.ByteString
|
||||
queryStatement = \case
|
||||
Bytes100_1k -> "SELECT * from _bytes_100_of_1k"
|
||||
Bytes400_200 -> "SELECT * from _bytes_400_of_200"
|
||||
Bytes10_20k -> "SELECT * from _bytes_10_of_20k"
|
||||
Bytes1_200 -> "SELECT * fromm _bytes_1_of_200"
|
||||
Bytes300_100 -> "SELECT * from _bytes_300_of_100"
|
||||
|
||||
benchPw :: B.ByteString -> IO ()
|
||||
benchPw statement = benchRequests createConnection $ \c -> do
|
||||
sendBatchAndSync c [q]
|
||||
d <- readNextData c
|
||||
waitReadyForQuery c
|
||||
where
|
||||
q = Query statement V.empty Binary Binary AlwaysCache
|
||||
createConnection = connect defaultSettings >>=
|
||||
either (error . ("Connection error " <>) . show) pure
|
||||
|
||||
defaultSettings = defaultConnectionSettings
|
||||
{ settingsHost = "localhost"
|
||||
, settingsDatabase = "travis_test"
|
||||
, settingsUser = "postgres"
|
||||
, settingsPassword = ""
|
||||
}
|
||||
|
||||
benchLibpq :: B.ByteString -> IO ()
|
||||
benchLibpq statement = benchRequests libpqConnection $ \c -> do
|
||||
r <- fromJust <$> LibPQ.execPrepared c "" [] LibPQ.Binary
|
||||
rows <- LibPQ.ntuples r
|
||||
parseRows r (rows - 1)
|
||||
where
|
||||
libpqConnection = do
|
||||
conn <- LibPQ.connectdb "host=localhost user=postgres dbname=travis_test"
|
||||
LibPQ.prepare conn "" "SELECT * from _bytes_300_of_100" Nothing
|
||||
pure conn
|
||||
parseRows r (-1) = pure ()
|
||||
parseRows r n = LibPQ.getvalue r n 0 >> parseRows r (n - 1)
|
||||
|
||||
benchRequests :: IO c -> (c -> IO a) -> IO ()
|
||||
benchRequests connectAction queryAction = do
|
||||
rs <- replicateM 8 newThread
|
||||
threadDelay $ 2 *1000000
|
||||
traverse (\(_,_, tid) -> killThread tid) rs
|
||||
s <- sum <$> traverse (\(ref, _, _) -> readIORef ref) rs
|
||||
latency_total <- sum <$> traverse (\(_, ref, _) -> readIORef ref) rs
|
||||
print $ "Requests: " ++ show s
|
||||
print $ "Average latency: " ++ show (latency_total `div` fromIntegral s)
|
||||
results <- replicateM 8 newThread
|
||||
threadDelay $ durationSeconds * 1000 * 1000
|
||||
for_ results $ \(_, _, tid) -> killThread tid
|
||||
s <- sum <$> traverse (\(ref, _, _) -> readIORef ref) results
|
||||
latency_total <- sum <$> traverse (\(_, ref, _) -> readIORef ref) results
|
||||
|
||||
print $ "Requests per second: " ++ show (s `div` durationSeconds)
|
||||
print $ "Average latency, ms: " ++ displayLatency latency_total s
|
||||
where
|
||||
durationSeconds = 10
|
||||
newThread = do
|
||||
ref_count <- newIORef 0 :: IO (IORef Word)
|
||||
ref_count <- newIORef 0 :: IO (IORef Int)
|
||||
ref_latency <- newIORef 0 :: IO (IORef Int64)
|
||||
c <- connectAction
|
||||
tid <- forkIO $ forever $ do
|
||||
@ -113,82 +144,33 @@ benchRequests connectAction queryAction = do
|
||||
modifyIORef' ref_count (+1)
|
||||
pure (ref_count, ref_latency, tid)
|
||||
|
||||
getDifference (TimeSpec end_s end_ns) (TimeSpec start_s start_ns) =
|
||||
(end_s - start_s) * 1000000000 + end_ns - start_ns
|
||||
getDifference (TimeSpec end_s end_ns) (TimeSpec start_s start_ns) =
|
||||
(end_s - start_s) * 1000000000 + end_ns - start_ns
|
||||
|
||||
requestAction c = replicateM_ 100 $ do
|
||||
sendBatchAndSync c [q]
|
||||
readNextData c
|
||||
waitReadyForQuery c
|
||||
where
|
||||
q = Query largeStmt V.empty Binary Binary AlwaysCache
|
||||
largeStmt = "SELECT * from _bytes_1_of_200"
|
||||
displayLatency latency reqs =
|
||||
let a = latency `div` fromIntegral reqs
|
||||
(ms, ns) = a `divMod` 1000000
|
||||
in show ms <> "." <> show ns
|
||||
|
||||
benchMultiPw :: IO ()
|
||||
benchMultiPw = benchRequests createConnection $ \c -> do
|
||||
sendBatchAndSync c [q]
|
||||
d <- readNextData c
|
||||
waitReadyForQuery c
|
||||
where
|
||||
q = Query largeStmt V.empty Binary Binary AlwaysCache
|
||||
largeStmt = "SELECT * from _bytes_300_of_100"
|
||||
-- largeStmt = "select typname, typnamespace, typowner, typlen, typbyval,"
|
||||
-- <> "typcategory, typispreferred, typisdefined, typdelim,"
|
||||
-- <> "typrelid, typelem, typarray from pg_type"
|
||||
benchLoop :: IO ()
|
||||
benchLoop = do
|
||||
counter <- newIORef 0 :: IO (IORef Word)
|
||||
content <- newIORef "" :: IO (IORef BL.ByteString)
|
||||
-- TODO read file
|
||||
!bs <- B.readFile "1.txt"
|
||||
writeIORef content . BL.cycle $ BL.fromStrict bs
|
||||
|
||||
benchLibpq :: IO ()
|
||||
benchLibpq = benchRequests libpqConnection $ \c -> do
|
||||
r <- fromJust <$> LibPQ.execPrepared c "" [] LibPQ.Binary
|
||||
rows <- LibPQ.ntuples r
|
||||
go r (rows - 1)
|
||||
where
|
||||
libpqConnection = do
|
||||
conn <- LibPQ.connectdb "host=localhost user=postgres dbname=travis_test"
|
||||
LibPQ.prepare conn "" "SELECT * from _bytes_300_of_100" Nothing
|
||||
pure conn
|
||||
go r (-1) = pure ()
|
||||
go r n = LibPQ.getvalue r n 0 >> go r (n - 1)
|
||||
|
||||
|
||||
-- Connection
|
||||
-- | Creates connection with default filter.
|
||||
createConnection :: IO Connection
|
||||
createConnection = getConnection <$> connect defaultSettings
|
||||
|
||||
getConnection :: Either Error Connection -> Connection
|
||||
getConnection (Left e) = error $ "Connection error " ++ show e
|
||||
getConnection (Right c) = c
|
||||
|
||||
defaultSettings = defaultConnectionSettings
|
||||
{ settingsHost = "localhost"
|
||||
, settingsDatabase = "travis_test"
|
||||
, settingsUser = "postgres"
|
||||
, settingsPassword = ""
|
||||
}
|
||||
|
||||
-- Orphans
|
||||
|
||||
instance NFData (AbsConnection a) where
|
||||
rnf _ = ()
|
||||
|
||||
instance NFData Error where
|
||||
rnf _ = ()
|
||||
|
||||
instance (NFData a1, NFData a2, NFData a3, NFData a4, NFData a5, NFData a6, NFData a7, NFData a8, NFData a9, NFData a10, NFData a11, NFData a12) =>
|
||||
NFData (a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12) where
|
||||
rnf (x1, x2, x3, x4, x5, x6, x7, x8, x9, x10, x11, x12) =
|
||||
rnf x1 `seq`
|
||||
rnf x2 `seq`
|
||||
rnf x3 `seq`
|
||||
rnf x4 `seq`
|
||||
rnf x5 `seq`
|
||||
rnf x6 `seq`
|
||||
rnf x7 `seq`
|
||||
rnf x8 `seq`
|
||||
rnf x9 `seq`
|
||||
rnf x10 `seq`
|
||||
rnf x11 `seq`
|
||||
rnf x12
|
||||
|
||||
instance NFData (Decode a) where
|
||||
rnf !d = ()
|
||||
let handler dm = case dm of
|
||||
DataMessage _ -> modifyIORef' counter (+1)
|
||||
_ -> pure ()
|
||||
newChunk preBs = do
|
||||
b <- readIORef content
|
||||
let (nb, rest) = BL.splitAt 4096 b
|
||||
writeIORef content rest
|
||||
let res = preBs <> ( BL.toStrict nb)
|
||||
res `seq` pure res
|
||||
tid <- forkIO . forever $ loopExtractDataRows newChunk handler
|
||||
threadDelay 10000000
|
||||
killThread tid
|
||||
s <- readIORef counter
|
||||
print $ "Data messages parsed: " ++ show s
|
||||
|
@ -122,6 +122,7 @@ benchmark postgres-wire-bench
|
||||
, deepseq
|
||||
, postgresql-libpq
|
||||
, clock
|
||||
, optparse-applicative
|
||||
ghc-options: -O2 -threaded -rtsopts -with-rtsopts=-s
|
||||
default-language: Haskell2010
|
||||
default-extensions:
|
||||
|
@ -61,6 +61,7 @@ decodeAuthResponse = do
|
||||
_ -> fail "Unknown authentication response"
|
||||
_ -> fail "Invalid auth response"
|
||||
|
||||
{-# INLINE decodeHeader #-}
|
||||
decodeHeader :: Decode Header
|
||||
decodeHeader = Header <$> getWord8 <*>
|
||||
(fromIntegral . subtract 4 <$> getWord32BE)
|
||||
@ -98,6 +99,7 @@ decodeServerMessage (Header c len) = case chr $ fromIntegral c of
|
||||
rowsCount <- fromIntegral <$> getWord16BE
|
||||
RowDescription <$> V.replicateM rowsCount decodeFieldDescription
|
||||
|
||||
{-# INLINE decodeTransactionStatus #-}
|
||||
decodeTransactionStatus :: Decode TransactionStatus
|
||||
decodeTransactionStatus = getWord8 >>= \t ->
|
||||
case chr $ fromIntegral t of
|
||||
@ -116,12 +118,14 @@ decodeFieldDescription = FieldDescription
|
||||
<*> getInt32BE
|
||||
<*> decodeFormat
|
||||
|
||||
{-# INLINE decodeNotification #-}
|
||||
decodeNotification :: Decode Notification
|
||||
decodeNotification = Notification
|
||||
<$> (ServerProcessId <$> getWord32BE)
|
||||
<*> (ChannelName <$> getByteStringNull)
|
||||
<*> getByteStringNull
|
||||
|
||||
{-# INLINE decodeFormat #-}
|
||||
decodeFormat :: Decode Format
|
||||
decodeFormat = getWord16BE >>= \f ->
|
||||
case f of
|
||||
@ -130,6 +134,7 @@ decodeFormat = getWord16BE >>= \f ->
|
||||
_ -> fail "Unknown field format"
|
||||
|
||||
-- | Helper to lift Either in Decode
|
||||
{-# INLINE eitherToDecode #-}
|
||||
eitherToDecode :: Either B.ByteString a -> Decode a
|
||||
eitherToDecode = either (fail . BS.unpack) pure
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user