From b8ca2fd69eda57fbdf1d21216844ea2fd19dd5ac Mon Sep 17 00:00:00 2001 From: Chris Allen Date: Sat, 12 Apr 2014 02:56:33 -0500 Subject: [PATCH] positive validation for bulk API --- Database/Bloodhound/Client.hs | 19 +++++++++++++------ tests/tests.hs | 23 +++++++++++++++++++++++ 2 files changed, 36 insertions(+), 6 deletions(-) diff --git a/Database/Bloodhound/Client.hs b/Database/Bloodhound/Client.hs index 982dffb..34428cd 100644 --- a/Database/Bloodhound/Client.hs +++ b/Database/Bloodhound/Client.hs @@ -15,6 +15,7 @@ module Database.Bloodhound.Client , searchByType , refreshIndex , mkSearch + , bulk , IndexSettings(..) , Server(..) , Reply(..) @@ -54,6 +55,7 @@ module Database.Bloodhound.Client , DocId(..) , CacheName(..) , CacheKey(..) + , BulkOperation(..) ) where @@ -63,7 +65,7 @@ import Data.Aeson import Data.Aeson.TH (deriveJSON) import qualified Data.ByteString.Lazy.Char8 as L import Data.ByteString.Builder -import Data.List (intercalate) +import Data.List (foldl', intercalate, intersperse) import Data.Maybe (fromMaybe) import Data.Monoid import Data.Text (Text) @@ -267,8 +269,16 @@ bulk :: Server -> [BulkOperation] -> IO Reply bulk (Server server) bulkOps = dispatch url method body where url = joinPath [server, "_bulk"] method = NHTM.methodPost - blobs = concat $ fmap getStreamChunk bulkOps - body = Just $ toLazyByteString $ mash (mempty :: Builder) blobs + body = Just $ collapseStream bulkOps + +collapseStream :: [BulkOperation] -> L.ByteString +collapseStream stream = collapsed where + blobs = intersperse "\n" $ concat $ fmap getStreamChunk stream + mashedTaters = mash (mempty :: Builder) blobs + collapsed = toLazyByteString $ mappend mashedTaters "\n" + +mash :: Builder -> [L.ByteString] -> Builder +mash builder xs = foldl' (\b x -> mappend b (lazyByteString x)) builder xs data BulkOperation = BulkIndex IndexName MappingName DocId Value @@ -276,9 +286,6 @@ data BulkOperation = | BulkDelete IndexName MappingName DocId | BulkUpdate IndexName MappingName DocId Value deriving (Eq, Show) -mash :: Builder -> [L.ByteString] -> Builder -mash builder xs = foldr (\x b -> mappend b (lazyByteString x)) builder xs - mkMetadataValue :: Text -> String -> String -> String -> Value mkMetadataValue operation indexName mappingName docId = object [operation .= diff --git a/tests/tests.hs b/tests/tests.hs index 50158f9..e802512 100644 --- a/tests/tests.hs +++ b/tests/tests.hs @@ -81,6 +81,10 @@ searchExpectNoResults search = do let emptyHits = fmap (hits . searchHits) result emptyHits `shouldBe` Right [] +data BulkTest = BulkTest { name :: Text } deriving (Eq, Generic, Show) +instance FromJSON BulkTest +instance ToJSON BulkTest + main :: IO () main = hspec $ do @@ -101,6 +105,25 @@ main = hspec $ do (responseBody docInserted) :: Either String (EsResult Tweet) fmap _source newTweet `shouldBe` Right exampleTweet + describe "bulk API" $ do + it "inserts all documents we request" $ do + _ <- insertData + let firstTest = BulkTest "blah" + let secondTest = BulkTest "bloo" + let firstDoc = BulkIndex (IndexName "twitter") + (MappingName "tweet") (DocId "2") (object ["name" .= String "blah"]) + let secondDoc = BulkCreate (IndexName "twitter") + (MappingName "tweet") (DocId "3") (object ["name" .= String "bloo"]) + let stream = [firstDoc, secondDoc] + response <- bulk testServer stream + _ <- refreshIndex testServer testIndex + fDoc <- getDocument testServer testIndex testMapping (DocId "2") + sDoc <- getDocument testServer testIndex testMapping (DocId "3") + let maybeFirst = eitherDecode $ responseBody fDoc :: Either String (EsResult BulkTest) + let maybeSecond = eitherDecode $ responseBody sDoc :: Either String (EsResult BulkTest) + fmap _source maybeFirst `shouldBe` Right firstTest + fmap _source maybeSecond `shouldBe` Right secondTest + describe "query API" $ do it "returns document for term query and identity filter" $ do _ <- insertData