mirror of
synced 2025-01-07 15:22:21 +03:00
Expose low-level scroll API
Changes in this commit: - Put a note on scanSearch's runtime properties - Resolve a "todo" making ScrollId a proper newtype. - Add low level api calls for getInitialScroll and advanceScroll, with which scanSearch is now implemented. 'scanSearch' provides a high level but pretty limited functionality. It scans over the entire search range and appends hits into a list. There are a few problems with this approach: 1. It uses lazy lists and O(n) appends on each page fetched. This probably isn't a big deal for small results sets but for larger ones this could get inefficient. Which brings me to 2. It collects all the results in memory, so in general its not suitable for large data sets anyways. 3. The scroll window was fixed to 1 minute. That's a reasonable default but this isn't a decision that the library can make categorically. 4. *only* scanSearch was exported, making it impossible to perform a scrolling search if you wanted something more efficient. This change leaves scanSearch how it is but adds getInitialScroll and advanceScroll which can be easily used to create a pipes/conduits/what-have-you stream of hits. The only semantic changes are: 1. ScrollId is now a newtype as originally intended by the feature's author. 2. MonadThrow has been introduced into the type signature for the (AFAIK) impossible edge case of the server returning a response that can't be parsed as an EsError.
This commit is contained in:
@ -42,6 +42,8 @@ module Database.Bloodhound.Client
, searchByIndex
, searchByType
, scanSearch
, getInitialScroll
, advanceScroll
, refreshIndex
, mkSearch
, mkAggregateSearch
@ -78,6 +80,7 @@ import Data.Monoid
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Data.Time.Clock
import qualified Data.Vector as V
import Network.HTTP.Client
import qualified Network.HTTP.Types.Method as NHTM
@ -608,8 +611,12 @@ searchByType (IndexName indexName)
(MappingName mappingName) = bindM2 dispatchSearch url . return
where url = joinPath [indexName, mappingName, "_search"]
scanSearch' :: MonadBH m => IndexName -> MappingName -> Search -> m (Maybe ScrollId)
scanSearch' (IndexName indexName) (MappingName mappingName) search = do
-- | For a given scearch, request a scroll for efficient streaming of
-- search results. Note that the search is put into 'SearchTypeScan'
-- mode and thus results will not be sorted. Combine this with
-- 'advanceScroll' to efficiently stream through the full result set
getInitialScroll :: MonadBH m => IndexName -> MappingName -> Search -> m (Maybe ScrollId)
getInitialScroll (IndexName indexName) (MappingName mappingName) search = do
let url = joinPath [indexName, mappingName, "_search"]
search' = search { searchType = SearchTypeScan }
resp' <- bindM2 dispatchSearch url (return search')
@ -617,31 +624,54 @@ scanSearch' (IndexName indexName) (MappingName mappingName) search = do
msid = maybe Nothing scrollId msr
return msid
scroll' :: (FromJSON a, MonadBH m) => Maybe ScrollId -> m ([Hit a], Maybe ScrollId)
scroll' :: (FromJSON a, MonadBH m, MonadThrow m) => Maybe ScrollId -> m ([Hit a], Maybe ScrollId)
scroll' Nothing = return ([], Nothing)
scroll' (Just sid) = do
url <- joinPath ["_search/scroll?scroll=1m"]
resp' <- post url (Just . L.fromStrict $ T.encodeUtf8 sid)
let msr = decode' $ responseBody resp' :: FromJSON a => Maybe (SearchResult a)
resp = case msr of
Just sr -> (hits $ searchHits sr, scrollId sr)
_ -> ([], Nothing)
return resp
res <- advanceScroll sid 60
case res of
Right SearchResult {..} -> return (hits searchHits, scrollId)
Left _ -> return ([], Nothing)
simpleAccumilator :: (FromJSON a, MonadBH m) => [Hit a] -> ([Hit a], Maybe ScrollId) -> m ([Hit a], Maybe ScrollId)
simpleAccumilator oldHits (newHits, Nothing) = return (oldHits ++ newHits, Nothing)
simpleAccumilator oldHits ([], _) = return (oldHits, Nothing)
simpleAccumilator oldHits (newHits, msid) = do
-- | Use the given scroll to fetch the next page of documents. If
-- there are still further pages, there will be a value in the
-- 'scrollId' field of the 'SearchResult'
:: ( FromJSON a
, MonadBH m
, MonadThrow m
=> ScrollId
-> NominalDiffTime
-- ^ How long should the snapshot of data be kept around? This timeout is updated every time 'advanceScroll' is used, so don't feel the need to set it to the entire duration of your search processing. Note that durations < 1s will be rounded up. Also note that 'NominalDiffTime' is an instance of Num so literals like 60 will be interpreted as seconds. 60s is a reasonable default.
-> m (Either EsError (SearchResult a))
advanceScroll (ScrollId sid) scroll = do
url <- joinPath ["_search/scroll?scroll=" <> scrollTime]
parseEsResponse =<< post url (Just . L.fromStrict $ T.encodeUtf8 sid)
where scrollTime = showText secs <> "s"
secs :: Integer
secs = round scroll
simpleAccumulator :: (FromJSON a, MonadBH m, MonadThrow m) => [Hit a] -> ([Hit a], Maybe ScrollId) -> m ([Hit a], Maybe ScrollId)
simpleAccumulator oldHits (newHits, Nothing) = return (oldHits ++ newHits, Nothing)
simpleAccumulator oldHits ([], _) = return (oldHits, Nothing)
simpleAccumulator oldHits (newHits, msid) = do
(newHits', msid') <- scroll' msid
simpleAccumilator (oldHits ++ newHits) (newHits', msid')
simpleAccumulator (oldHits ++ newHits) (newHits', msid')
-- | 'scanSearch' uses the 'scan&scroll' API of elastic,
-- for a given 'IndexName' and 'MappingName',
scanSearch :: (FromJSON a, MonadBH m) => IndexName -> MappingName -> Search -> m [Hit a]
-- for a given 'IndexName' and 'MappingName'. Note that this will
-- consume the entire search result set and will be doing O(n) list
-- appends so this may not be suitable for large result sets. In that
-- case, 'getInitialScroll' and 'advanceScroll' are good low level
-- tools. You should be able to hook them up trivially to conduit,
-- pipes, or your favorite streaming IO abstraction of choice. Note
-- that ordering on the search would destroy performance and thus is
-- ignored.
scanSearch :: (FromJSON a, MonadBH m, MonadThrow m) => IndexName -> MappingName -> Search -> m [Hit a]
scanSearch indexName mappingName search = do
msid <- scanSearch' indexName mappingName search
msid <- getInitialScroll indexName mappingName search
(hits, msid') <- scroll' msid
(totalHits, _) <- simpleAccumilator [] (hits, msid')
(totalHits, _) <- simpleAccumulator [] (hits, msid')
return totalHits
-- | 'mkSearch' is a helper function for defaulting additional fields of a 'Search'
@ -83,7 +83,7 @@ module Database.Bloodhound.Types
, Search(..)
, SearchType(..)
, SearchResult(..)
, ScrollId
, ScrollId(..)
, SearchHits(..)
, TrackSortScores
, From(..)
@ -1332,7 +1332,7 @@ data SearchResult a =
, aggregations :: Maybe AggregationResults
, scrollId :: Maybe ScrollId } deriving (Eq, Show)
type ScrollId = Text -- Fixme: Newtype
newtype ScrollId = ScrollId Text deriving (Eq, Show, Ord, ToJSON, FromJSON)
type Score = Maybe Double
Reference in New Issue
Block a user