Extractor DataRows

This commit is contained in:
VyacheslavHashov 2017-02-14 00:11:27 +03:00
parent f610b570de
commit 4957a61686
3 changed files with 117 additions and 3 deletions

View File

@ -35,7 +35,16 @@ benchRequests connectAction queryAction = do
benchMultiPw :: IO ()
benchMultiPw = benchRequests createConnection $ \c -> do
sendBatchAndSync c [q]
sendBatchAndSync c [q, q, q, q, q, q, q, q, q, q]
readNextData c
readNextData c
readNextData c
readNextData c
readNextData c
readNextData c
readNextData c
readNextData c
readNextData c
readNextData c
waitReadyForQuery c
where

View File

@ -229,7 +229,10 @@ receiverThread
:: RawConnection
-> InDataChan
-> IO ()
receiverThread rawConn dataChan = receiveLoop Nothing "" []
receiverThread rawConn dataChan =
loopExtractDataRows
(rReceive rawConn 4096)
(writeChan dataChan . Right)
where
receiveLoop
:: Maybe Header

View File

@ -7,6 +7,7 @@ module Database.PostgreSQL.Protocol.Decoders
-- * Helpers
, parseServerVersion
, parseIntegerDatetimes
, loopExtractDataRows
) where
import Control.Applicative
@ -14,15 +15,116 @@ import Control.Monad
import Data.Monoid ((<>))
import Data.Maybe (fromMaybe)
import Data.Char (chr)
import Data.Word
import Text.Read (readMaybe)
import qualified Data.Vector as V
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
import Data.ByteString.Char8 as BS(readInteger, readInt, unpack, pack)
import qualified Data.HashMap.Strict as HM
import Database.PostgreSQL.Protocol.Types
import Database.PostgreSQL.Protocol.Store.Decode
-- Extracts DataRows
--
data ExtractorResult = NeedMore | OtherHeader
loopExtractDataRows
:: IO B.ByteString -- read more action
-> (DataMessage -> IO ()) -- callback on every DataMessage
-> IO ()
loopExtractDataRows readMoreAction callback = go "" []
where
go :: B.ByteString -> [B.ByteString] -> IO ()
-- header size
go bs acc
| B.length bs < 5 = readMoreAndGo bs acc
| otherwise = do
-- print "Main branch"
let (offset, r) = scanDataRows 0 bs
let (ch, nbs) = B.splitAt offset bs
let (newAcc, newBs) = if B.null ch
then (acc, bs)
else (ch:acc, nbs)
case r of
NeedMore -> readMoreAndGo newBs newAcc
OtherHeader -> do
let (Header mt len) = parseHeader newBs
goOther mt len (B.drop 5 newBs) newAcc
goOther :: Word8 -> Int -> B.ByteString -> [B.ByteString] -> IO ()
goOther mt len bs acc = case chr (fromIntegral mt) of
'C' -> do
newBs <- skipBytes bs len
callback $ DataMessage . DataRows $ BL.fromChunks $ reverse acc
go newBs []
'I' -> do
callback $ DataMessage . DataRows $ BL.fromChunks $ reverse acc
go bs []
'E' -> do
(b, newBs) <- readAtLeast bs len
desc <- eitherToDecode $ parseErrorDesc b
callback (DataError desc)
go newBs []
'Z' -> do
newBs <- skipBytes bs len
callback DataReady
go newBs acc
c -> do
-- print $ "Unexpected: " ++ show c
newBs <- skipBytes bs len
go newBs acc
readMoreAndGo bs acc = do
-- print "Read more and go"
newBs <- readMoreAction
go (bs <> newBs) acc
readAtLeast :: B.ByteString -> Int -> IO (B.ByteString, B.ByteString)
readAtLeast bs len | B.length bs >= len = pure $ B.splitAt len bs
| otherwise = do
newBs <- readMoreAction
readAtLeast (bs <> newBs) len
skipBytes :: B.ByteString -> Int -> IO B.ByteString
skipBytes bs toSkip | toSkip <= 0 = pure bs
| B.length bs < toSkip = do
print $ "to skip " ++ show toSkip
newBs <- readMoreAction
skipBytes newBs (toSkip - B.length bs)
| otherwise = pure $ B.drop toSkip bs
scanDataRows :: Int -> B.ByteString -> (Int, ExtractorResult)
scanDataRows !acc !bs | B.length bs < 5 = (acc, NeedMore)
| otherwise =
let (Header mt len) = parseHeader bs
in if chr (fromIntegral mt) == 'D'
then if B.length bs < len + 5
then (acc, NeedMore)
else scanDataRows (acc + len + 5)
$ B.drop (len + 5) bs
else (acc, OtherHeader)
parseHeader :: B.ByteString -> Header
parseHeader bs =
let w1 = B.index bs 1
w2 = B.index bs 2
w3 = B.index bs 3
w4 = B.index bs 4
w = fromIntegral w1 * 256 * 256 * 256 +
fromIntegral w2 * 256 * 256 +
fromIntegral w3 * 256 +
fromIntegral w4
in Header (B.index bs 0) (w - 4)
-- MT_COMMAND_COMPLETE 'C'
-- MT_EMPTY_QUERY_RESPONSE 'I'
-- MT_ERROR_RESPONSE 'E'
-- MT_READY_FOR_QUERY 'Z'
decodeAuthResponse :: Decode AuthResponse
decodeAuthResponse = do
c <- getWord8
@ -252,6 +354,6 @@ parseNoticeDesc s = do
"is not presented in NoticeResponse message")
Right . HM.lookup c
eitherToDecode :: Either B.ByteString a -> Decode a
eitherToDecode :: Monad m => Either B.ByteString a -> m a
eitherToDecode = either (fail . BS.unpack) pure