diff --git a/bench/Bench.hs b/bench/Bench.hs index 40a778d..54be9bf 100644 --- a/bench/Bench.hs +++ b/bench/Bench.hs @@ -35,7 +35,16 @@ benchRequests connectAction queryAction = do benchMultiPw :: IO () benchMultiPw = benchRequests createConnection $ \c -> do - sendBatchAndSync c [q] + sendBatchAndSync c [q, q, q, q, q, q, q, q, q, q] + readNextData c + readNextData c + readNextData c + readNextData c + readNextData c + readNextData c + readNextData c + readNextData c + readNextData c readNextData c waitReadyForQuery c where diff --git a/src/Database/PostgreSQL/Driver/Connection.hs b/src/Database/PostgreSQL/Driver/Connection.hs index 8d1920e..4009467 100644 --- a/src/Database/PostgreSQL/Driver/Connection.hs +++ b/src/Database/PostgreSQL/Driver/Connection.hs @@ -229,7 +229,10 @@ receiverThread :: RawConnection -> InDataChan -> IO () -receiverThread rawConn dataChan = receiveLoop Nothing "" [] +receiverThread rawConn dataChan = + loopExtractDataRows + (rReceive rawConn 4096) + (writeChan dataChan . Right) where receiveLoop :: Maybe Header diff --git a/src/Database/PostgreSQL/Protocol/Decoders.hs b/src/Database/PostgreSQL/Protocol/Decoders.hs index e8be894..65bd60e 100644 --- a/src/Database/PostgreSQL/Protocol/Decoders.hs +++ b/src/Database/PostgreSQL/Protocol/Decoders.hs @@ -7,6 +7,7 @@ module Database.PostgreSQL.Protocol.Decoders -- * Helpers , parseServerVersion , parseIntegerDatetimes + , loopExtractDataRows ) where import Control.Applicative @@ -14,15 +15,116 @@ import Control.Monad import Data.Monoid ((<>)) import Data.Maybe (fromMaybe) import Data.Char (chr) +import Data.Word import Text.Read (readMaybe) import qualified Data.Vector as V import qualified Data.ByteString as B +import qualified Data.ByteString.Lazy as BL import Data.ByteString.Char8 as BS(readInteger, readInt, unpack, pack) import qualified Data.HashMap.Strict as HM import Database.PostgreSQL.Protocol.Types import Database.PostgreSQL.Protocol.Store.Decode +-- Extracts DataRows +-- +data ExtractorResult = NeedMore | OtherHeader + +loopExtractDataRows + :: IO B.ByteString -- read more action + -> (DataMessage -> IO ()) -- callback on every DataMessage + -> IO () +loopExtractDataRows readMoreAction callback = go "" [] + where + go :: B.ByteString -> [B.ByteString] -> IO () + -- header size + go bs acc + | B.length bs < 5 = readMoreAndGo bs acc + | otherwise = do + -- print "Main branch" + let (offset, r) = scanDataRows 0 bs + let (ch, nbs) = B.splitAt offset bs + let (newAcc, newBs) = if B.null ch + then (acc, bs) + else (ch:acc, nbs) + case r of + NeedMore -> readMoreAndGo newBs newAcc + OtherHeader -> do + let (Header mt len) = parseHeader newBs + goOther mt len (B.drop 5 newBs) newAcc + + goOther :: Word8 -> Int -> B.ByteString -> [B.ByteString] -> IO () + goOther mt len bs acc = case chr (fromIntegral mt) of + 'C' -> do + newBs <- skipBytes bs len + callback $ DataMessage . DataRows $ BL.fromChunks $ reverse acc + go newBs [] + 'I' -> do + callback $ DataMessage . DataRows $ BL.fromChunks $ reverse acc + go bs [] + 'E' -> do + (b, newBs) <- readAtLeast bs len + desc <- eitherToDecode $ parseErrorDesc b + callback (DataError desc) + go newBs [] + 'Z' -> do + newBs <- skipBytes bs len + callback DataReady + go newBs acc + c -> do + -- print $ "Unexpected: " ++ show c + newBs <- skipBytes bs len + go newBs acc + + readMoreAndGo bs acc = do + -- print "Read more and go" + newBs <- readMoreAction + go (bs <> newBs) acc + + readAtLeast :: B.ByteString -> Int -> IO (B.ByteString, B.ByteString) + readAtLeast bs len | B.length bs >= len = pure $ B.splitAt len bs + | otherwise = do + newBs <- readMoreAction + readAtLeast (bs <> newBs) len + + skipBytes :: B.ByteString -> Int -> IO B.ByteString + skipBytes bs toSkip | toSkip <= 0 = pure bs + | B.length bs < toSkip = do + print $ "to skip " ++ show toSkip + newBs <- readMoreAction + skipBytes newBs (toSkip - B.length bs) + | otherwise = pure $ B.drop toSkip bs + +scanDataRows :: Int -> B.ByteString -> (Int, ExtractorResult) +scanDataRows !acc !bs | B.length bs < 5 = (acc, NeedMore) + | otherwise = + let (Header mt len) = parseHeader bs + in if chr (fromIntegral mt) == 'D' + then if B.length bs < len + 5 + then (acc, NeedMore) + else scanDataRows (acc + len + 5) + $ B.drop (len + 5) bs + else (acc, OtherHeader) + +parseHeader :: B.ByteString -> Header +parseHeader bs = + let w1 = B.index bs 1 + w2 = B.index bs 2 + w3 = B.index bs 3 + w4 = B.index bs 4 + w = fromIntegral w1 * 256 * 256 * 256 + + fromIntegral w2 * 256 * 256 + + fromIntegral w3 * 256 + + fromIntegral w4 + in Header (B.index bs 0) (w - 4) + + +-- MT_COMMAND_COMPLETE 'C' +-- MT_EMPTY_QUERY_RESPONSE 'I' +-- MT_ERROR_RESPONSE 'E' +-- MT_READY_FOR_QUERY 'Z' + + decodeAuthResponse :: Decode AuthResponse decodeAuthResponse = do c <- getWord8 @@ -252,6 +354,6 @@ parseNoticeDesc s = do "is not presented in NoticeResponse message") Right . HM.lookup c -eitherToDecode :: Either B.ByteString a -> Decode a +eitherToDecode :: Monad m => Either B.ByteString a -> m a eitherToDecode = either (fail . BS.unpack) pure