More smart receiver buffer

This commit is contained in:
VyacheslavHashov 2017-07-11 23:48:34 +03:00
parent 1b0b59ddaa
commit c06fe27ac9
6 changed files with 46 additions and 11 deletions

View File

@ -161,12 +161,12 @@ authorize rawConn settings = do
readAuthResponse = do
-- 1024 should be enough for the auth response from a server at
-- the startup phase.
resp <- rReceive rawConn 1024
resp <- rReceive rawConn mempty 1024
case runDecode decodeAuthResponse resp of
(rest, r) -> case r of
AuthenticationOk ->
parseParameters
(\bs -> (bs <>) <$> rReceive rawConn 1024) rest
(\bs -> rReceive rawConn bs 1024) rest
AuthenticationCleartextPassword ->
performPasswordAuth makePlainPassword
AuthenticationMD5Password (MD5Salt salt) ->
@ -219,8 +219,10 @@ buildConnection rawConn connParams receiverAction = do
}
-- | Parses connection parameters.
parseParameters :: (B.ByteString -> IO B.ByteString)
-> B.ByteString -> IO (Either Error ConnectionParameters)
parseParameters
:: (B.ByteString -> IO B.ByteString)
-> B.ByteString
-> IO (Either Error ConnectionParameters)
parseParameters action str = Right <$> do
dict <- parseDict str HM.empty
serverVersion <- eitherToProtocolEx . parseServerVersion =<<
@ -261,7 +263,7 @@ receiverThread :: RawConnection -> InDataChan -> IO ()
receiverThread rawConn dataChan = loopExtractDataRows
-- TODO
-- dont append strings. Allocate buffer manually and use unsafeReceive
(\bs -> (bs <>) <$> rReceive rawConn 4096)
(\bs -> rReceive rawConn bs 4096)
(writeChan dataChan . Right)
-- | Any exception prevents thread from future work.
@ -279,7 +281,7 @@ receiverThreadCommon rawConn chan msgFilter ntfHandler = go ""
-- TODO
-- dont append strings. Allocate buffer manually and use unsafeReceive
readMoreAction = (\bs -> (bs <>) <$> rReceive rawConn 4096)
readMoreAction = (\bs -> rReceive rawConn bs 4096)
handler msg = do
dispatchIfNotification msg ntfHandler
when (msgFilter msg) $ writeChan chan $ Right msg

View File

@ -7,9 +7,11 @@ module Database.PostgreSQL.Driver.Error
, ReceiverException(..)
, IncorrectUsage
, ProtocolException
, PeerClosedConnection
-- * helpers
, throwIncorrectUsage
, throwProtocolEx
, throwClosedException
, eitherToProtocolEx
, throwErrorInIO
, throwAuthErrorInIO
@ -61,18 +63,30 @@ instance Exception ProtocolException where
displayException (ProtocolException msg) =
"Exception in protocol, " ++ BS.unpack msg
-- | Exception throw when remote peer closes connections.
data PeerClosedConnection = PeerClosedConnection
deriving (Show)
instance Exception PeerClosedConnection where
displayException _ = "Remote peer closed the connection"
throwIncorrectUsage :: ByteString -> IO a
throwIncorrectUsage = throwIO . IncorrectUsage
throwProtocolEx :: ByteString -> IO a
throwProtocolEx = throwIO . ProtocolException
throwClosedException :: IO a
throwClosedException = throwIO PeerClosedConnection
eitherToProtocolEx :: Either ByteString a -> IO a
eitherToProtocolEx = either throwProtocolEx pure
-- TODO rename without throw since it actually does not throw exceptions
throwErrorInIO :: Error -> IO (Either Error a)
throwErrorInIO = pure . Left
-- TODO rename without throw since it actually does not throw exceptions
throwAuthErrorInIO :: AuthError -> IO (Either Error a)
throwAuthErrorInIO = pure . Left . AuthError

View File

@ -82,6 +82,7 @@ sendBatchEndBy msg conn qs = do
batch <- constructBatch conn qs
sendEncode conn $ batch <> encodeClientMessage msg
-- INFO about invalid statement in cache
constructBatch :: Connection -> [Query] -> IO Encode
constructBatch conn = fmap fold . traverse constructSingle
where

View File

@ -4,19 +4,23 @@ module Database.PostgreSQL.Driver.RawConnection
, createRawConnection
) where
import Control.Monad (void)
import Control.Monad (void, when)
import Control.Exception (bracketOnError, try)
import Safe (headMay)
import Data.Monoid ((<>))
import Foreign (castPtr, plusPtr)
import System.Socket (socket, AddressInfo(..), getAddressInfo, socketAddress,
aiV4Mapped, AddressInfoException, Socket, connect,
close, receive, send)
import System.Socket.Unsafe (unsafeReceive)
import System.Socket.Family.Inet (Inet)
import System.Socket.Type.Stream (Stream, sendAll)
import System.Socket.Protocol.TCP (TCP)
import System.Socket.Protocol.Default (Default)
import System.Socket.Family.Unix (Unix, socketAddressUnixPath)
import qualified Data.ByteString as B
import qualified Data.ByteString.Internal as B
import qualified Data.ByteString.Unsafe as B
import qualified Data.ByteString.Char8 as BS(pack)
import Database.PostgreSQL.Driver.Error
@ -27,7 +31,8 @@ data RawConnection = RawConnection
{ rFlush :: IO ()
, rClose :: IO ()
, rSend :: B.ByteString -> IO ()
, rReceive :: Int -> IO B.ByteString
-- ByteString that should be prepended to received ByteString
, rReceive :: B.ByteString -> Int -> IO B.ByteString
}
defaultUnixPathDirectory :: B.ByteString
@ -75,6 +80,17 @@ constructRawConnection s = RawConnection
{ rFlush = pure ()
, rClose = close s
, rSend = \msg -> void $ sendAll s msg mempty
, rReceive = \n -> receive s n mempty
, rReceive = rawReceive s
}
{-# INLINE rawReceive #-}
rawReceive :: Socket f Stream p -> B.ByteString -> Int -> IO B.ByteString
rawReceive s bs n = B.unsafeUseAsCStringLen bs $ \(prevPtr, prevLen) ->
let bufSize = prevLen + n
in B.createUptoN bufSize $ \bufPtr -> do
B.memcpy bufPtr (castPtr prevPtr) prevLen
len <- unsafeReceive s (bufPtr `plusPtr` prevLen)
(fromIntegral bufSize) mempty
-- Received empty string means closed connection by the remote host
when (len == 0) throwClosedException
pure $ prevLen + fromIntegral len

View File

@ -10,7 +10,7 @@ import Database.PostgreSQL.Protocol.Types
-- | Prepared statement storage
data StatementStorage = StatementStorage
(H.BasicHashTable StatementSQL StatementName) (IORef Word)
!(H.BasicHashTable StatementSQL StatementName) !(IORef Word)
-- | Cache policy about prepared statements.
data CachePolicy
@ -24,6 +24,8 @@ newStatementStorage = StatementStorage <$> H.new <*> newIORef 0
lookupStatement :: StatementStorage -> StatementSQL -> IO (Maybe StatementName)
lookupStatement (StatementStorage table _) = H.lookup table
-- TODO place right name
-- TODO info about exceptions and mask
storeStatement :: StatementStorage -> StatementSQL -> IO StatementName
storeStatement (StatementStorage table counter) stmt = do
n <- readIORef counter

View File

@ -6,7 +6,7 @@ module Database.PostgreSQL.Protocol.DataRows
, decodeOneRow
) where
import Data.Monoid ((<>))
import Data.Monoid ((<>))
import Data.Word (Word8, byteSwap32)
import Foreign (peek, peekByteOff, castPtr)
import qualified Data.ByteString as B