mirror of
https://github.com/typeable/bloodhound.git
synced 2025-01-05 21:36:03 +03:00
positive validation for bulk API
This commit is contained in:
parent
6b50fdcf72
commit
b8ca2fd69e
@ -15,6 +15,7 @@ module Database.Bloodhound.Client
|
||||
, searchByType
|
||||
, refreshIndex
|
||||
, mkSearch
|
||||
, bulk
|
||||
, IndexSettings(..)
|
||||
, Server(..)
|
||||
, Reply(..)
|
||||
@ -54,6 +55,7 @@ module Database.Bloodhound.Client
|
||||
, DocId(..)
|
||||
, CacheName(..)
|
||||
, CacheKey(..)
|
||||
, BulkOperation(..)
|
||||
)
|
||||
where
|
||||
|
||||
@ -63,7 +65,7 @@ import Data.Aeson
|
||||
import Data.Aeson.TH (deriveJSON)
|
||||
import qualified Data.ByteString.Lazy.Char8 as L
|
||||
import Data.ByteString.Builder
|
||||
import Data.List (intercalate)
|
||||
import Data.List (foldl', intercalate, intersperse)
|
||||
import Data.Maybe (fromMaybe)
|
||||
import Data.Monoid
|
||||
import Data.Text (Text)
|
||||
@ -267,8 +269,16 @@ bulk :: Server -> [BulkOperation] -> IO Reply
|
||||
bulk (Server server) bulkOps = dispatch url method body where
|
||||
url = joinPath [server, "_bulk"]
|
||||
method = NHTM.methodPost
|
||||
blobs = concat $ fmap getStreamChunk bulkOps
|
||||
body = Just $ toLazyByteString $ mash (mempty :: Builder) blobs
|
||||
body = Just $ collapseStream bulkOps
|
||||
|
||||
collapseStream :: [BulkOperation] -> L.ByteString
|
||||
collapseStream stream = collapsed where
|
||||
blobs = intersperse "\n" $ concat $ fmap getStreamChunk stream
|
||||
mashedTaters = mash (mempty :: Builder) blobs
|
||||
collapsed = toLazyByteString $ mappend mashedTaters "\n"
|
||||
|
||||
mash :: Builder -> [L.ByteString] -> Builder
|
||||
mash builder xs = foldl' (\b x -> mappend b (lazyByteString x)) builder xs
|
||||
|
||||
data BulkOperation =
|
||||
BulkIndex IndexName MappingName DocId Value
|
||||
@ -276,9 +286,6 @@ data BulkOperation =
|
||||
| BulkDelete IndexName MappingName DocId
|
||||
| BulkUpdate IndexName MappingName DocId Value deriving (Eq, Show)
|
||||
|
||||
mash :: Builder -> [L.ByteString] -> Builder
|
||||
mash builder xs = foldr (\x b -> mappend b (lazyByteString x)) builder xs
|
||||
|
||||
mkMetadataValue :: Text -> String -> String -> String -> Value
|
||||
mkMetadataValue operation indexName mappingName docId =
|
||||
object [operation .=
|
||||
|
@ -81,6 +81,10 @@ searchExpectNoResults search = do
|
||||
let emptyHits = fmap (hits . searchHits) result
|
||||
emptyHits `shouldBe` Right []
|
||||
|
||||
data BulkTest = BulkTest { name :: Text } deriving (Eq, Generic, Show)
|
||||
instance FromJSON BulkTest
|
||||
instance ToJSON BulkTest
|
||||
|
||||
main :: IO ()
|
||||
main = hspec $ do
|
||||
|
||||
@ -101,6 +105,25 @@ main = hspec $ do
|
||||
(responseBody docInserted) :: Either String (EsResult Tweet)
|
||||
fmap _source newTweet `shouldBe` Right exampleTweet
|
||||
|
||||
describe "bulk API" $ do
|
||||
it "inserts all documents we request" $ do
|
||||
_ <- insertData
|
||||
let firstTest = BulkTest "blah"
|
||||
let secondTest = BulkTest "bloo"
|
||||
let firstDoc = BulkIndex (IndexName "twitter")
|
||||
(MappingName "tweet") (DocId "2") (object ["name" .= String "blah"])
|
||||
let secondDoc = BulkCreate (IndexName "twitter")
|
||||
(MappingName "tweet") (DocId "3") (object ["name" .= String "bloo"])
|
||||
let stream = [firstDoc, secondDoc]
|
||||
response <- bulk testServer stream
|
||||
_ <- refreshIndex testServer testIndex
|
||||
fDoc <- getDocument testServer testIndex testMapping (DocId "2")
|
||||
sDoc <- getDocument testServer testIndex testMapping (DocId "3")
|
||||
let maybeFirst = eitherDecode $ responseBody fDoc :: Either String (EsResult BulkTest)
|
||||
let maybeSecond = eitherDecode $ responseBody sDoc :: Either String (EsResult BulkTest)
|
||||
fmap _source maybeFirst `shouldBe` Right firstTest
|
||||
fmap _source maybeSecond `shouldBe` Right secondTest
|
||||
|
||||
describe "query API" $ do
|
||||
it "returns document for term query and identity filter" $ do
|
||||
_ <- insertData
|
||||
|
Loading…
Reference in New Issue
Block a user