mirror of
https://github.com/flipstone/orville.git
synced 2024-11-20 12:51:30 +03:00
Make Connections Thread Safe
Issuing multiple commands concurrently on the same connection results in one of those commands hanging indefinitely, possibly due to a dead-lock somewhere in libpq. This introduces an MVar that is used to serialize access to the connection for utilization purposes. The existing semantics around closing connections should be preserved by the use of a second MVar.
This commit is contained in:
parent
b59a7dce4f
commit
0c0a0017a9
@ -34,12 +34,12 @@ module Orville.PostgreSQL.Raw.Connection
|
||||
where
|
||||
|
||||
import Control.Concurrent (getNumCapabilities, threadWaitRead, threadWaitWrite)
|
||||
import Control.Concurrent.MVar (MVar, newMVar, tryReadMVar, tryTakeMVar)
|
||||
import Control.Concurrent.MVar (MVar, newMVar, tryTakeMVar, withMVar)
|
||||
import Control.Exception (Exception, mask, throwIO)
|
||||
import Control.Monad (void)
|
||||
import qualified Data.ByteString as BS
|
||||
import qualified Data.ByteString.Builder as BSB
|
||||
import qualified Data.ByteString.Char8 as B8
|
||||
import Data.IORef (IORef, newIORef, readIORef, writeIORef)
|
||||
import Data.Maybe (fromMaybe)
|
||||
#if MIN_VERSION_resource_pool(0,4,0)
|
||||
import Data.Pool (Pool, newPool, defaultPoolConfig, setNumStripes, withResource)
|
||||
@ -236,12 +236,28 @@ executeRaw connection bs params =
|
||||
Right paramBytes ->
|
||||
underlyingExecute bs paramBytes connection
|
||||
|
||||
data ConnectionState
|
||||
= OpenConnection LibPQ.Connection
|
||||
| ClosedConnection
|
||||
|
||||
data ConnectionContext = ConnectionContext
|
||||
{ i_connUtilizationLock :: MVar ()
|
||||
-- ^ Used to serialize access to the connection for the purpose of issuing commands
|
||||
, i_connCloseLock :: MVar ()
|
||||
-- ^ Used to guarantee that only one thread will close the connection. This
|
||||
-- is separate from the utilization lock because a connection should still
|
||||
-- be closeable if it is in use by another thread but should not be closed
|
||||
-- by multiple threads simultaneously.
|
||||
, i_connState :: ConnectionState
|
||||
-- ^ The underlying connection, if open
|
||||
}
|
||||
|
||||
{- |
|
||||
An Orville handler for a LibPQ connection.
|
||||
|
||||
@since 1.0.0.0
|
||||
-}
|
||||
newtype Connection = Connection (MVar LibPQ.Connection)
|
||||
newtype Connection = Connection (IORef ConnectionContext)
|
||||
|
||||
{- |
|
||||
'connect' is the internal, primitive connection function.
|
||||
@ -275,7 +291,14 @@ connect noticeReporting connString =
|
||||
LibPQ.PollingWriting ->
|
||||
checkSocketAndThreadWait conn threadWaitWrite
|
||||
LibPQ.PollingOk -> do
|
||||
connectionHandle <- newMVar conn
|
||||
connUseLock <- newMVar ()
|
||||
connCloseLock <- newMVar ()
|
||||
connectionHandle <- newIORef
|
||||
ConnectionContext
|
||||
{ i_connUtilizationLock = connUseLock
|
||||
, i_connCloseLock = connCloseLock
|
||||
, i_connState = OpenConnection conn
|
||||
}
|
||||
pure (Connection connectionHandle)
|
||||
in
|
||||
do
|
||||
@ -295,33 +318,34 @@ connect noticeReporting connString =
|
||||
From the previous link, 'tryTakeMVar' is not interruptible, where @takeMVar@
|
||||
*is*. So by using 'tryTakeMVar' along with 'mask', we should be safe from
|
||||
async exceptions causing us to not finish an underlying connection. Notice
|
||||
that the only place the MVar is ever taken is here so 'tryTakeMVar' gives us
|
||||
both the non-blocking semantics to protect from async exceptions with 'mask'
|
||||
_and_ should never truly return an empty unless two threads were racing to
|
||||
close the connection, in which case.. one of them will close the connection.
|
||||
that the only place the close lock MVar is ever taken is here so
|
||||
'tryTakeMVar' gives us both the non-blocking semantics to protect from async
|
||||
exceptions with 'mask' _and_ should never truly return an empty unless two
|
||||
threads were racing to close the connection, in which case.. one of them will
|
||||
close the connection.
|
||||
|
||||
@since 1.0.0.0
|
||||
-}
|
||||
close :: Connection -> IO ()
|
||||
close (Connection handle) =
|
||||
let
|
||||
underlyingFinish :: (forall a. IO a -> IO a) -> IO (Maybe ())
|
||||
underlyingFinish :: (forall a. IO a -> IO a) -> IO ()
|
||||
underlyingFinish restore = do
|
||||
underlyingConnection <- tryTakeMVar handle
|
||||
restore (traverse LibPQ.finish underlyingConnection)
|
||||
connCtx <- readIORef handle
|
||||
mCloseLock <- tryTakeMVar $ i_connCloseLock connCtx
|
||||
case (mCloseLock, i_connState connCtx) of
|
||||
(Just (), OpenConnection underlyingConnection) -> do
|
||||
writeIORef handle connCtx { i_connState = ClosedConnection }
|
||||
restore (LibPQ.finish underlyingConnection)
|
||||
_ -> pure ()
|
||||
in
|
||||
void $ mask underlyingFinish
|
||||
mask underlyingFinish
|
||||
|
||||
{- |
|
||||
'underlyingExecute' is the internal, primitive execute function.
|
||||
|
||||
This is not intended to be directly exposed to end users, but instead wrapped
|
||||
in something using a pool. Note there are potential dragons here in that
|
||||
this calls @tryReadMvar@ and then returns an error if the MVar is not full.
|
||||
The intent is to never expose the ability to empty the `MVar` outside of this
|
||||
module, so unless a connection has been closed it *should* never be empty.
|
||||
And a connection should be closed upon removal from a resource pool (in which
|
||||
case it can't be used for this function in the first place).
|
||||
in something using a pool.
|
||||
|
||||
@since 1.0.0.0
|
||||
-}
|
||||
@ -330,20 +354,20 @@ underlyingExecute ::
|
||||
[Maybe BS.ByteString] ->
|
||||
Connection ->
|
||||
IO LibPQ.Result
|
||||
underlyingExecute bs params connection = do
|
||||
libPQConn <- readLibPQConnectionOrFailIfClosed connection
|
||||
mbResult <-
|
||||
LibPQ.execParams libPQConn bs (map mkInferredTextParam params) LibPQ.Text
|
||||
underlyingExecute bs params connection =
|
||||
withLibPQConnectionOrFailIfClosed connection $ \libPQConn -> do
|
||||
mbResult <-
|
||||
LibPQ.execParams libPQConn bs (map mkInferredTextParam params) LibPQ.Text
|
||||
|
||||
case mbResult of
|
||||
Nothing -> do
|
||||
throwExecutionErrorWithoutResult libPQConn bs
|
||||
Just result -> do
|
||||
execStatus <- LibPQ.resultStatus result
|
||||
case mbResult of
|
||||
Nothing -> do
|
||||
throwExecutionErrorWithoutResult libPQConn bs
|
||||
Just result -> do
|
||||
execStatus <- LibPQ.resultStatus result
|
||||
|
||||
if isRowReadableStatus execStatus
|
||||
then pure result
|
||||
else throwExecutionErrorWithResult result execStatus bs
|
||||
if isRowReadableStatus execStatus
|
||||
then pure result
|
||||
else throwExecutionErrorWithResult result execStatus bs
|
||||
|
||||
{- |
|
||||
Escapes and quotes a string for use as a literal within a SQL command that
|
||||
@ -361,19 +385,19 @@ underlyingExecute bs params connection = do
|
||||
@since 1.0.0.0
|
||||
-}
|
||||
quoteStringLiteral :: Connection -> BS.ByteString -> IO BSB.Builder
|
||||
quoteStringLiteral connection unquotedString = do
|
||||
libPQConn <- readLibPQConnectionOrFailIfClosed connection
|
||||
mbEscapedString <- LibPQ.escapeStringConn libPQConn unquotedString
|
||||
quoteStringLiteral connection unquotedString =
|
||||
withLibPQConnectionOrFailIfClosed connection $ \libPQConn -> do
|
||||
mbEscapedString <- LibPQ.escapeStringConn libPQConn unquotedString
|
||||
|
||||
case mbEscapedString of
|
||||
Nothing ->
|
||||
throwConnectionError "Error while escaping string literal" libPQConn
|
||||
Just escapedString ->
|
||||
let
|
||||
singleQuote =
|
||||
BSB.char8 '\''
|
||||
in
|
||||
pure (singleQuote <> BSB.byteString escapedString <> singleQuote)
|
||||
case mbEscapedString of
|
||||
Nothing ->
|
||||
throwConnectionError "Error while escaping string literal" libPQConn
|
||||
Just escapedString ->
|
||||
let
|
||||
singleQuote =
|
||||
BSB.char8 '\''
|
||||
in
|
||||
pure (singleQuote <> BSB.byteString escapedString <> singleQuote)
|
||||
|
||||
{- |
|
||||
Escapes and quotes a string for use as an identifier within a SQL command
|
||||
@ -388,25 +412,33 @@ quoteStringLiteral connection unquotedString = do
|
||||
@since 1.0.0.0
|
||||
-}
|
||||
quoteIdentifier :: Connection -> BS.ByteString -> IO BSB.Builder
|
||||
quoteIdentifier connection unquotedString = do
|
||||
libPQConn <- readLibPQConnectionOrFailIfClosed connection
|
||||
mbEscapedString <- LibPQ.escapeIdentifier libPQConn unquotedString
|
||||
quoteIdentifier connection unquotedString =
|
||||
withLibPQConnectionOrFailIfClosed connection $ \libPQConn -> do
|
||||
mbEscapedString <- LibPQ.escapeIdentifier libPQConn unquotedString
|
||||
|
||||
case mbEscapedString of
|
||||
Nothing ->
|
||||
throwConnectionError "Error while escaping identifier" libPQConn
|
||||
Just quotedString ->
|
||||
pure (BSB.byteString quotedString)
|
||||
case mbEscapedString of
|
||||
Nothing ->
|
||||
throwConnectionError "Error while escaping identifier" libPQConn
|
||||
Just quotedString ->
|
||||
pure (BSB.byteString quotedString)
|
||||
|
||||
readLibPQConnectionOrFailIfClosed :: Connection -> IO LibPQ.Connection
|
||||
readLibPQConnectionOrFailIfClosed (Connection handle) = do
|
||||
mbConn <- tryReadMVar handle
|
||||
{- |
|
||||
Serializes access to the underlying LibPQ connection. This is necessary
|
||||
because multiple concurrent commands issued using the same connection will
|
||||
result in a dead-lock in LibPQ.
|
||||
|
||||
case mbConn of
|
||||
Nothing ->
|
||||
throwIO ConnectionUsedAfterCloseError
|
||||
Just conn ->
|
||||
pure conn
|
||||
Do not nest calls to this function with the same connection or it will
|
||||
dead-lock on the MVar.
|
||||
-}
|
||||
withLibPQConnectionOrFailIfClosed :: Connection -> (LibPQ.Connection -> IO a) -> IO a
|
||||
withLibPQConnectionOrFailIfClosed (Connection handle) withConnection = do
|
||||
connCtx <- readIORef handle
|
||||
withMVar (i_connUtilizationLock connCtx) $ \() ->
|
||||
case i_connState connCtx of
|
||||
ClosedConnection ->
|
||||
throwIO ConnectionUsedAfterCloseError
|
||||
OpenConnection conn ->
|
||||
withConnection conn
|
||||
|
||||
throwConnectionError :: String -> LibPQ.Connection -> IO a
|
||||
throwConnectionError message conn = do
|
||||
|
Loading…
Reference in New Issue
Block a user