Merge pull request #81 from Soostone/low-level-scroll

Low level scroll
This commit is contained in:
Chris Allen 2015-11-14 21:36:01 -06:00
commit 312fad2cfa
2 changed files with 52 additions and 22 deletions

View File

@ -42,6 +42,8 @@ module Database.Bloodhound.Client
, searchByIndex
, searchByType
, scanSearch
, getInitialScroll
, advanceScroll
, refreshIndex
, mkSearch
, mkAggregateSearch
@ -78,6 +80,7 @@ import Data.Monoid
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Data.Time.Clock
import qualified Data.Vector as V
import Network.HTTP.Client
import qualified Network.HTTP.Types.Method as NHTM
@ -289,7 +292,7 @@ existentialQuery url = do
-- failing that tries to parse it as an EsError. All well-formed, JSON
-- responses from elasticsearch should fall into these two
-- categories. If they don't, a 'StatusCodeException' will be thrown.
parseEsResponse :: (MonadBH m, MonadThrow m, FromJSON a) => Reply
parseEsResponse :: (MonadThrow m, FromJSON a) => Reply
-> m (Either EsError a)
parseEsResponse reply
| respIsTwoHunna reply = case eitherDecode body of
@ -608,8 +611,12 @@ searchByType (IndexName indexName)
(MappingName mappingName) = bindM2 dispatchSearch url . return
where url = joinPath [indexName, mappingName, "_search"]
scanSearch' :: MonadBH m => IndexName -> MappingName -> Search -> m (Maybe ScrollId)
scanSearch' (IndexName indexName) (MappingName mappingName) search = do
-- | For a given scearch, request a scroll for efficient streaming of
-- search results. Note that the search is put into 'SearchTypeScan'
-- mode and thus results will not be sorted. Combine this with
-- 'advanceScroll' to efficiently stream through the full result set
getInitialScroll :: MonadBH m => IndexName -> MappingName -> Search -> m (Maybe ScrollId)
getInitialScroll (IndexName indexName) (MappingName mappingName) search = do
let url = joinPath [indexName, mappingName, "_search"]
search' = search { searchType = SearchTypeScan }
resp' <- bindM2 dispatchSearch url (return search')
@ -617,31 +624,54 @@ scanSearch' (IndexName indexName) (MappingName mappingName) search = do
msid = maybe Nothing scrollId msr
return msid
scroll' :: (FromJSON a, MonadBH m) => Maybe ScrollId -> m ([Hit a], Maybe ScrollId)
scroll' :: (FromJSON a, MonadBH m, MonadThrow m) => Maybe ScrollId -> m ([Hit a], Maybe ScrollId)
scroll' Nothing = return ([], Nothing)
scroll' (Just sid) = do
url <- joinPath ["_search/scroll?scroll=1m"]
resp' <- post url (Just . L.fromStrict $ T.encodeUtf8 sid)
let msr = decode' $ responseBody resp' :: FromJSON a => Maybe (SearchResult a)
resp = case msr of
Just sr -> (hits $ searchHits sr, scrollId sr)
_ -> ([], Nothing)
return resp
res <- advanceScroll sid 60
case res of
Right SearchResult {..} -> return (hits searchHits, scrollId)
Left _ -> return ([], Nothing)
simpleAccumilator :: (FromJSON a, MonadBH m) => [Hit a] -> ([Hit a], Maybe ScrollId) -> m ([Hit a], Maybe ScrollId)
simpleAccumilator oldHits (newHits, Nothing) = return (oldHits ++ newHits, Nothing)
simpleAccumilator oldHits ([], _) = return (oldHits, Nothing)
simpleAccumilator oldHits (newHits, msid) = do
-- | Use the given scroll to fetch the next page of documents. If
-- there are still further pages, there will be a value in the
-- 'scrollId' field of the 'SearchResult'
advanceScroll
:: ( FromJSON a
, MonadBH m
, MonadThrow m
)
=> ScrollId
-> NominalDiffTime
-- ^ How long should the snapshot of data be kept around? This timeout is updated every time 'advanceScroll' is used, so don't feel the need to set it to the entire duration of your search processing. Note that durations < 1s will be rounded up. Also note that 'NominalDiffTime' is an instance of Num so literals like 60 will be interpreted as seconds. 60s is a reasonable default.
-> m (Either EsError (SearchResult a))
advanceScroll (ScrollId sid) scroll = do
url <- joinPath ["_search/scroll?scroll=" <> scrollTime]
parseEsResponse =<< post url (Just . L.fromStrict $ T.encodeUtf8 sid)
where scrollTime = showText secs <> "s"
secs :: Integer
secs = round scroll
simpleAccumulator :: (FromJSON a, MonadBH m, MonadThrow m) => [Hit a] -> ([Hit a], Maybe ScrollId) -> m ([Hit a], Maybe ScrollId)
simpleAccumulator oldHits (newHits, Nothing) = return (oldHits ++ newHits, Nothing)
simpleAccumulator oldHits ([], _) = return (oldHits, Nothing)
simpleAccumulator oldHits (newHits, msid) = do
(newHits', msid') <- scroll' msid
simpleAccumilator (oldHits ++ newHits) (newHits', msid')
simpleAccumulator (oldHits ++ newHits) (newHits', msid')
-- | 'scanSearch' uses the 'scan&scroll' API of elastic,
-- for a given 'IndexName' and 'MappingName',
scanSearch :: (FromJSON a, MonadBH m) => IndexName -> MappingName -> Search -> m [Hit a]
-- for a given 'IndexName' and 'MappingName'. Note that this will
-- consume the entire search result set and will be doing O(n) list
-- appends so this may not be suitable for large result sets. In that
-- case, 'getInitialScroll' and 'advanceScroll' are good low level
-- tools. You should be able to hook them up trivially to conduit,
-- pipes, or your favorite streaming IO abstraction of choice. Note
-- that ordering on the search would destroy performance and thus is
-- ignored.
scanSearch :: (FromJSON a, MonadBH m, MonadThrow m) => IndexName -> MappingName -> Search -> m [Hit a]
scanSearch indexName mappingName search = do
msid <- scanSearch' indexName mappingName search
msid <- getInitialScroll indexName mappingName search
(hits, msid') <- scroll' msid
(totalHits, _) <- simpleAccumilator [] (hits, msid')
(totalHits, _) <- simpleAccumulator [] (hits, msid')
return totalHits
-- | 'mkSearch' is a helper function for defaulting additional fields of a 'Search'

View File

@ -83,7 +83,7 @@ module Database.Bloodhound.Types
, Search(..)
, SearchType(..)
, SearchResult(..)
, ScrollId
, ScrollId(..)
, SearchHits(..)
, TrackSortScores
, From(..)
@ -1332,7 +1332,7 @@ data SearchResult a =
, aggregations :: Maybe AggregationResults
, scrollId :: Maybe ScrollId } deriving (Eq, Show)
type ScrollId = Text -- Fixme: Newtype
newtype ScrollId = ScrollId Text deriving (Eq, Show, Ord, ToJSON, FromJSON)
type Score = Maybe Double