diff --git a/src/Database/Bloodhound/Client.hs b/src/Database/Bloodhound/Client.hs index 1d2704f..b349cfa 100644 --- a/src/Database/Bloodhound/Client.hs +++ b/src/Database/Bloodhound/Client.hs @@ -42,6 +42,8 @@ module Database.Bloodhound.Client , searchByIndex , searchByType , scanSearch + , getInitialScroll + , advanceScroll , refreshIndex , mkSearch , mkAggregateSearch @@ -78,6 +80,7 @@ import Data.Monoid import Data.Text (Text) import qualified Data.Text as T import qualified Data.Text.Encoding as T +import Data.Time.Clock import qualified Data.Vector as V import Network.HTTP.Client import qualified Network.HTTP.Types.Method as NHTM @@ -289,7 +292,7 @@ existentialQuery url = do -- failing that tries to parse it as an EsError. All well-formed, JSON -- responses from elasticsearch should fall into these two -- categories. If they don't, a 'StatusCodeException' will be thrown. -parseEsResponse :: (MonadBH m, MonadThrow m, FromJSON a) => Reply +parseEsResponse :: (MonadThrow m, FromJSON a) => Reply -> m (Either EsError a) parseEsResponse reply | respIsTwoHunna reply = case eitherDecode body of @@ -608,8 +611,12 @@ searchByType (IndexName indexName) (MappingName mappingName) = bindM2 dispatchSearch url . return where url = joinPath [indexName, mappingName, "_search"] -scanSearch' :: MonadBH m => IndexName -> MappingName -> Search -> m (Maybe ScrollId) -scanSearch' (IndexName indexName) (MappingName mappingName) search = do +-- | For a given scearch, request a scroll for efficient streaming of +-- search results. Note that the search is put into 'SearchTypeScan' +-- mode and thus results will not be sorted. Combine this with +-- 'advanceScroll' to efficiently stream through the full result set +getInitialScroll :: MonadBH m => IndexName -> MappingName -> Search -> m (Maybe ScrollId) +getInitialScroll (IndexName indexName) (MappingName mappingName) search = do let url = joinPath [indexName, mappingName, "_search"] search' = search { searchType = SearchTypeScan } resp' <- bindM2 dispatchSearch url (return search') @@ -617,31 +624,54 @@ scanSearch' (IndexName indexName) (MappingName mappingName) search = do msid = maybe Nothing scrollId msr return msid -scroll' :: (FromJSON a, MonadBH m) => Maybe ScrollId -> m ([Hit a], Maybe ScrollId) +scroll' :: (FromJSON a, MonadBH m, MonadThrow m) => Maybe ScrollId -> m ([Hit a], Maybe ScrollId) scroll' Nothing = return ([], Nothing) scroll' (Just sid) = do - url <- joinPath ["_search/scroll?scroll=1m"] - resp' <- post url (Just . L.fromStrict $ T.encodeUtf8 sid) - let msr = decode' $ responseBody resp' :: FromJSON a => Maybe (SearchResult a) - resp = case msr of - Just sr -> (hits $ searchHits sr, scrollId sr) - _ -> ([], Nothing) - return resp + res <- advanceScroll sid 60 + case res of + Right SearchResult {..} -> return (hits searchHits, scrollId) + Left _ -> return ([], Nothing) -simpleAccumilator :: (FromJSON a, MonadBH m) => [Hit a] -> ([Hit a], Maybe ScrollId) -> m ([Hit a], Maybe ScrollId) -simpleAccumilator oldHits (newHits, Nothing) = return (oldHits ++ newHits, Nothing) -simpleAccumilator oldHits ([], _) = return (oldHits, Nothing) -simpleAccumilator oldHits (newHits, msid) = do +-- | Use the given scroll to fetch the next page of documents. If +-- there are still further pages, there will be a value in the +-- 'scrollId' field of the 'SearchResult' +advanceScroll + :: ( FromJSON a + , MonadBH m + , MonadThrow m + ) + => ScrollId + -> NominalDiffTime + -- ^ How long should the snapshot of data be kept around? This timeout is updated every time 'advanceScroll' is used, so don't feel the need to set it to the entire duration of your search processing. Note that durations < 1s will be rounded up. Also note that 'NominalDiffTime' is an instance of Num so literals like 60 will be interpreted as seconds. 60s is a reasonable default. + -> m (Either EsError (SearchResult a)) +advanceScroll (ScrollId sid) scroll = do + url <- joinPath ["_search/scroll?scroll=" <> scrollTime] + parseEsResponse =<< post url (Just . L.fromStrict $ T.encodeUtf8 sid) + where scrollTime = showText secs <> "s" + secs :: Integer + secs = round scroll + +simpleAccumulator :: (FromJSON a, MonadBH m, MonadThrow m) => [Hit a] -> ([Hit a], Maybe ScrollId) -> m ([Hit a], Maybe ScrollId) +simpleAccumulator oldHits (newHits, Nothing) = return (oldHits ++ newHits, Nothing) +simpleAccumulator oldHits ([], _) = return (oldHits, Nothing) +simpleAccumulator oldHits (newHits, msid) = do (newHits', msid') <- scroll' msid - simpleAccumilator (oldHits ++ newHits) (newHits', msid') + simpleAccumulator (oldHits ++ newHits) (newHits', msid') -- | 'scanSearch' uses the 'scan&scroll' API of elastic, --- for a given 'IndexName' and 'MappingName', -scanSearch :: (FromJSON a, MonadBH m) => IndexName -> MappingName -> Search -> m [Hit a] +-- for a given 'IndexName' and 'MappingName'. Note that this will +-- consume the entire search result set and will be doing O(n) list +-- appends so this may not be suitable for large result sets. In that +-- case, 'getInitialScroll' and 'advanceScroll' are good low level +-- tools. You should be able to hook them up trivially to conduit, +-- pipes, or your favorite streaming IO abstraction of choice. Note +-- that ordering on the search would destroy performance and thus is +-- ignored. +scanSearch :: (FromJSON a, MonadBH m, MonadThrow m) => IndexName -> MappingName -> Search -> m [Hit a] scanSearch indexName mappingName search = do - msid <- scanSearch' indexName mappingName search + msid <- getInitialScroll indexName mappingName search (hits, msid') <- scroll' msid - (totalHits, _) <- simpleAccumilator [] (hits, msid') + (totalHits, _) <- simpleAccumulator [] (hits, msid') return totalHits -- | 'mkSearch' is a helper function for defaulting additional fields of a 'Search' diff --git a/src/Database/Bloodhound/Types.hs b/src/Database/Bloodhound/Types.hs index 682d073..abcaa9e 100644 --- a/src/Database/Bloodhound/Types.hs +++ b/src/Database/Bloodhound/Types.hs @@ -83,7 +83,7 @@ module Database.Bloodhound.Types , Search(..) , SearchType(..) , SearchResult(..) - , ScrollId + , ScrollId(..) , SearchHits(..) , TrackSortScores , From(..) @@ -1332,7 +1332,7 @@ data SearchResult a = , aggregations :: Maybe AggregationResults , scrollId :: Maybe ScrollId } deriving (Eq, Show) -type ScrollId = Text -- Fixme: Newtype +newtype ScrollId = ScrollId Text deriving (Eq, Show, Ord, ToJSON, FromJSON) type Score = Maybe Double