Add optimistic concurrency control version feature

This is a first attempt at addressing #44
This commit is contained in:
Michael Xavier 2015-07-10 11:29:10 -07:00
parent c7e0412d67
commit 2d8be0d3e7
3 changed files with 184 additions and 11 deletions

View File

@ -47,6 +47,10 @@ module Database.Bloodhound.Client
, getStatus
, encodeBulkOperations
, encodeBulkOperation
-- * Reply-handling tools
, isVersionConflict
, isSuccess
, isCreated
)
where
@ -58,7 +62,9 @@ import Data.Aeson
import Data.ByteString.Lazy.Builder
import qualified Data.ByteString.Lazy.Char8 as L
import Data.Default.Class
import Data.Ix
import Data.Maybe (fromMaybe)
import Data.Monoid
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
@ -160,6 +166,15 @@ joinPath ps = do
Server s <- bhServer <$> getBHEnv
return $ joinPath' (s:ps)
-- | Severely dumbed down query renderer. Assumes your data doesn't
-- need any encoding
addQuery :: [(Text, Text)] -> Text -> Text
addQuery ps u = u <> "?" <> params
where
params = T.intercalate "&" (uncurry mkParam <$> ps)
mkParam k v = k <> "=" <> v
bindM2 :: (Applicative m, Monad m) => (a -> b -> m c) -> m a -> m b -> m c
bindM2 f ma mb = join (f <$> ma <*> mb)
@ -315,15 +330,23 @@ deleteMapping (IndexName indexName)
-- convert into a JSON 'Value'. The 'DocId' will function as the
-- primary key for the document.
--
-- >>> resp <- runBH' $ indexDocument testIndex testMapping exampleTweet (DocId "1")
-- >>> resp <- runBH' $ indexDocument defaultIndexDocumentSettings testIndex testMapping exampleTweet (DocId "1")
-- >>> print resp
-- Response {responseStatus = Status {statusCode = 201, statusMessage = "Created"}, responseVersion = HTTP/1.1, responseHeaders = [("Content-Type","application/json; charset=UTF-8"),("Content-Length","74")], responseBody = "{\"_index\":\"twitter\",\"_type\":\"tweet\",\"_id\":\"1\",\"_version\":1,\"created\":true}", responseCookieJar = CJ {expose = []}, responseClose' = ResponseClose}
indexDocument :: (ToJSON doc, MonadBH m) => IndexName -> MappingName
-> doc -> DocId -> m Reply
-> IndexDocumentSettings -> doc -> DocId -> m Reply
indexDocument (IndexName indexName)
(MappingName mappingName) document (DocId docId) =
(MappingName mappingName) cfg document (DocId docId) =
bindM2 put url (return body)
where url = joinPath [indexName, mappingName, docId]
where url = addQuery params <$> joinPath [indexName, mappingName, docId]
params = case idsVersionControl cfg of
NoVersionControl -> []
InternalVersion v -> versionParams v "internal"
ExternalGT (ExternalDocVersion v) -> versionParams v "external_gt"
ExternalGTE (ExternalDocVersion v) -> versionParams v "external_gte"
ForceVersion (ExternalDocVersion v) -> versionParams v "force"
vt = T.pack . show . docVersionNumber
versionParams v t = [("version", vt v), ("version_type", t)]
body = Just (encode document)
-- | 'deleteDocument' is the primary way to delete a single document.
@ -536,3 +559,17 @@ setURI req URI{..} = do
Scheme "https" -> True
_ -> False
theQueryString = [(k , Just v) | (k, v) <- queryPairs uriQuery]
-- | Was there an optimistic concurrency control conflict when
-- indexing a document?
isVersionConflict :: Reply -> Bool
isVersionConflict = statusCheck (== 409)
isSuccess :: Reply -> Bool
isSuccess = statusCheck (inRange (200, 299))
isCreated :: Reply -> Bool
isCreated = statusCheck (== 201)
statusCheck :: (Int -> Bool) -> Reply -> Bool
statusCheck prd = prd . NHTS.statusCode . responseStatus

View File

@ -29,6 +29,7 @@
module Database.Bloodhound.Types
( defaultCache
, defaultIndexSettings
, defaultIndexDocumentSettings
, mkSort
, showText
, unpackId
@ -41,6 +42,8 @@ module Database.Bloodhound.Types
, mkTermsAggregation
, mkTermsScriptAggregation
, mkDateHistogram
, mkDocVersion
, docVersionNumber
, toTerms
, toDateHistogram
, omitNulls
@ -56,6 +59,10 @@ module Database.Bloodhound.Types
, Server(..)
, Reply
, EsResult(..)
, DocVersion
, ExternalDocVersion(..)
, VersionControl(..)
, IndexDocumentSettings(..)
, Query(..)
, Search(..)
, SearchResult(..)
@ -218,10 +225,12 @@ import qualified Data.ByteString.Lazy.Char8 as L
import Data.List (nub)
import Data.List.NonEmpty (NonEmpty (..), toList)
import qualified Data.Map.Strict as M
import Data.Maybe
import Data.Text (Text)
import qualified Data.Text as T
import Data.Time.Clock (UTCTime)
import qualified Data.Vector as V
import GHC.Enum
import GHC.Generics (Generic)
import Network.HTTP.Client
import qualified Network.HTTP.Types.Method as NHTM
@ -371,10 +380,78 @@ data BulkOperation =
data EsResult a = EsResult { _index :: Text
, _type :: Text
, _id :: Text
, _version :: Int
, _version :: DocVersion
, found :: Maybe Bool
, _source :: a } deriving (Eq, Show)
{-| 'DocVersion' is an integer version number for a document between 1
and 9.2e+18 used for <<https://www.elastic.co/guide/en/elasticsearch/guide/current/optimistic-concurrency-control.html optimistic concurrency control>>.
-}
newtype DocVersion = DocVersion {
docVersionNumber :: Int
} deriving (Eq, Show, Ord, ToJSON)
-- | Smart constructor for in-range doc version
mkDocVersion :: Int -> Maybe DocVersion
mkDocVersion i
| i >= (docVersionNumber minBound) && i <= (docVersionNumber maxBound) =
Just $ DocVersion i
| otherwise = Nothing
{-| 'ExternalDocVersion' is a convenience wrapper if your code uses its
own version numbers instead of ones from ES.
-}
newtype ExternalDocVersion = ExternalDocVersion DocVersion
deriving (Eq, Show, Ord, Bounded, Enum, ToJSON)
{-| 'VersionControl' is specified when indexing documents as a
optimistic concurrency control.
-}
data VersionControl = NoVersionControl
-- ^ Don't send a version. This is a pure overwrite.
| InternalVersion DocVersion
-- ^ Use the default ES versioning scheme. Only
-- index the document if the version is the same
-- as the one specified. Only applicable to
-- updates, as you should be getting Version from
-- a search result.
| ExternalGT ExternalDocVersion
-- ^ Use your own version numbering. Only index
-- the document if the version is strictly higher
-- OR the document doesn't exist. The given
-- version will be used as the new version number
-- for the stored document. N.B. All updates must
-- increment this number, meaning there is some
-- global, external ordering of updates.
| ExternalGTE ExternalDocVersion
-- ^ Use your own version numbering. Only index
-- the document if the version is equal or higher
-- than the stored version. Will succeed if there
-- is no existing document. The given version will
-- be used as the new version number for the
-- stored document. Use with care, as this could
-- result in data loss.
| ForceVersion ExternalDocVersion
-- ^ The document will always be indexed and the
-- given version will be the new version. This is
-- typically used for correcting errors. Use with
-- care, as this could result in data loss.
deriving (Show, Eq, Ord)
{-| 'IndexDocumentSettings' are special settings supplied when indexing
a document. For the best backwards compatiblity when new fields are
added, you should probably prefer to start with 'defaultIndexDocumentSettings'
-}
data IndexDocumentSettings = IndexDocumentSettings {
idsVersionControl :: VersionControl
}
{-| Reasonable default settings. Chooses no version control.
-}
defaultIndexDocumentSettings :: IndexDocumentSettings
defaultIndexDocumentSettings = IndexDocumentSettings NoVersionControl
{-| 'Sort' is a synonym for a list of 'SortSpec's. Sort behavior is order
dependent with later sorts acting as tie-breakers for earlier sorts.
-}
@ -2173,3 +2250,26 @@ instance FromJSON ShardResult where
v .: "successful" <*>
v .: "failed"
parseJSON _ = empty
instance FromJSON DocVersion where
parseJSON v = do
i <- parseJSON v
maybe (fail "DocVersion out of range") return $ mkDocVersion i
instance Bounded DocVersion where
minBound = DocVersion 1
maxBound = DocVersion 9200000000000000000 -- 9.2e+18
instance Enum DocVersion where
succ x
| x /= maxBound = DocVersion (succ $ docVersionNumber x)
| otherwise = succError "DocVersion"
pred x
| x /= minBound = DocVersion (pred $ docVersionNumber x)
| otherwise = predError "DocVersion"
toEnum i =
fromMaybe (error $ show i ++ " out of DocVersion range") $ mkDocVersion i
fromEnum = docVersionNumber
enumFrom = boundedEnumFrom
enumFromThen = boundedEnumFromThen

View File

@ -25,7 +25,7 @@ import Network.HTTP.Client
import qualified Network.HTTP.Types.Status as NHTS
import Prelude hiding (filter)
import Test.Hspec
import Test.QuickCheck.Property.Monoid
import Test.QuickCheck.Property.Monoid (prop_Monoid, eq, T(..))
import Test.Hspec.QuickCheck (prop)
import Test.QuickCheck
@ -147,18 +147,27 @@ otherTweet = Tweet { user = "notmyapp"
, age = 1000
, location = Location 40.12 (-71.34) }
insertData :: BH IO ()
insertData = do
resetIndex :: BH IO ()
resetIndex = do
_ <- deleteExampleIndex
_ <- createExampleIndex
_ <- putMapping testIndex testMapping TweetMapping
_ <- indexDocument testIndex testMapping exampleTweet (DocId "1")
_ <- refreshIndex testIndex
return ()
insertData :: BH IO Reply
insertData = do
resetIndex
insertData' defaultIndexDocumentSettings
insertData' :: IndexDocumentSettings -> BH IO Reply
insertData' ids = do
r <- indexDocument testIndex testMapping ids exampleTweet (DocId "1")
_ <- refreshIndex testIndex
return r
insertOther :: BH IO ()
insertOther = do
_ <- indexDocument testIndex testMapping otherTweet (DocId "2")
_ <- indexDocument testIndex testMapping defaultIndexDocumentSettings otherTweet (DocId "2")
_ <- refreshIndex testIndex
return ()
@ -290,6 +299,14 @@ main = hspec $ do
(responseBody docInserted) :: Either String (EsResult Tweet)
liftIO $ (fmap _source newTweet `shouldBe` Right exampleTweet)
it "can use optimistic concurrency control" $ withTestEnv $ do
let ev = ExternalDocVersion minBound
let cfg = defaultIndexDocumentSettings { idsVersionControl = ExternalGT ev }
resetIndex
res <- insertData' cfg
liftIO $ isCreated res `shouldBe` True
res' <- insertData' cfg
liftIO $ isVersionConflict res' `shouldBe` True
describe "bulk API" $ do
it "inserts all documents we request" $ withTestEnv $ do
@ -659,3 +676,22 @@ main = hspec $ do
describe "Monoid (SearchHits a)" $ do
prop "abides the monoid laws" $ eq $
prop_Monoid (T :: T (SearchHits ()))
describe "mkDocVersion" $ do
prop "can never construct an out of range docVersion" $ \i ->
let res = mkDocVersion i
in case res of
Nothing -> property True
Just dv -> (dv >= minBound) .&&.
(dv <= maxBound) .&&.
docVersionNumber dv === i
describe "Enum DocVersion" $ do
it "follows the laws of Enum, Bounded" $ do
return (succ maxBound :: DocVersion) `shouldThrow` anyErrorCall
return (pred minBound :: DocVersion) `shouldThrow` anyErrorCall
return (toEnum 0 :: DocVersion) `shouldThrow` anyErrorCall
return (toEnum 9200000000000000001 :: DocVersion) `shouldThrow` anyErrorCall
enumFrom (pred maxBound :: DocVersion) `shouldBe` [pred maxBound, maxBound]
enumFrom (pred maxBound :: DocVersion) `shouldBe` [pred maxBound, maxBound]
enumFromThen minBound (pred maxBound :: DocVersion) `shouldBe` [minBound, pred maxBound, maxBound]