graphql-engine/server/src-lib/Hasura/Backends/MySQL/DataLoader/Execute.hs

399 lines
13 KiB
Haskell
Raw Normal View History

{-# LANGUAGE UndecidableInstances #-}
-- |
--
-- Execute the plan given from .Plan.
module Hasura.Backends.MySQL.DataLoader.Execute
( OutputValue (..),
RecordSet (..),
ExecuteProblem (..),
execute,
runExecute,
-- for testing
joinObjectRows,
leftObjectJoin,
)
where
import Control.Monad.IO.Class
import Data.Aeson hiding (Value)
import Data.Aeson qualified as J
import Data.Bifunctor
import Data.Foldable
import Data.Graph
import Data.HashMap.Strict.InsOrd qualified as OMap
import Data.IORef
import Data.Vector (Vector)
import Data.Vector qualified as V
import GHC.TypeLits qualified
import Hasura.Backends.MySQL.Connection (runQueryYieldingRows)
import Hasura.Backends.MySQL.DataLoader.Plan
( Action (..),
FieldName (..),
HeadAndTail (..),
Join
( joinFieldName,
joinRhsOffset,
joinRhsTop,
joinType,
leftRecordSet,
rightRecordSet
),
PlannedAction (..),
Ref,
selectQuery,
toFieldName,
)
import Hasura.Backends.MySQL.DataLoader.Plan qualified as DataLoaderPlan
import Hasura.Backends.MySQL.DataLoader.Plan qualified as Plan
import Hasura.Backends.MySQL.ToQuery (fromSelect, toQueryFlat)
import Hasura.Backends.MySQL.Types hiding
( FieldName,
ScalarValue,
selectWhere,
)
-- import Hasura.Backends.MySQL.Types qualified as MySQL
import Hasura.GraphQL.Parser ()
-- Brings an instance for Hashable (Vector a)...
import Hasura.Prelude hiding
( concatMap,
elem,
head,
map,
mapMaybe,
tail,
toList,
)
-- | A set of records produced by the database. These are joined
-- together. There are all sorts of optimizations possible here, from
-- using a matrix/flat vector, unboxed sums for Value, etc. Presently
-- we choose a naive implementation in the interest of getting other
-- work done.
data RecordSet = RecordSet
{ origin :: !(Maybe PlannedAction),
rows :: !(Vector (InsOrdHashMap FieldName OutputValue)),
wantedFields :: !(Maybe [Text])
}
deriving (Show)
instance GHC.TypeLits.TypeError ('GHC.TypeLits.Text "Aeson loses key order, so you can't use this instance.") => ToJSON RecordSet where
toJSON RecordSet {} = error "RecordSet.toJSON: do not use."
-- | The read-only info. used by the Execute monad. Later, this IORef
-- may become either atomically modified or in an STM or MVar so that
-- jobs can be executed in parallel.
data ExecuteReader = ExecuteReader
{ recordSets :: IORef (InsOrdHashMap Ref RecordSet),
credentials :: !SourceConfig
}
-- | Any problem encountered while executing the plan.
data ExecuteProblem
= GetJobDecodeProblem String
| CreateQueryJobDecodeProblem String
| JoinProblem ExecuteProblem
| UnsupportedJoinBug JoinType
| MissingRecordSetBug Ref
| BrokenJoinInvariant [DataLoaderPlan.FieldName]
deriving (Show)
-- | Execute monad; as queries are performed, the record sets are
-- stored in the map.
newtype Execute a = Execute
{unExecute :: ReaderT ExecuteReader (ExceptT ExecuteProblem IO) a}
deriving
( Functor,
Applicative,
Monad,
MonadReader ExecuteReader,
MonadIO,
MonadError ExecuteProblem
)
-- | A value outputted by this execute module in a record set.
data OutputValue
= ArrayOutputValue !(Vector OutputValue)
| RecordOutputValue !(InsOrdHashMap DataLoaderPlan.FieldName OutputValue)
| ScalarOutputValue !J.Value -- TODO: switch to 'MySQL.Scalar...'?
| NullOutputValue
deriving (Show, Eq, Generic)
instance Hashable OutputValue
--------------------------------------------------------------------------------
-- Main entry points
-- | Using the config, run the execute action. Finally, resolve the
-- head-and-tail to a record set.
runExecute ::
MonadIO m =>
SourceConfig ->
HeadAndTail ->
Execute a ->
m (Either ExecuteProblem RecordSet)
runExecute credentials headAndTail action = do
recordSets <- liftIO (newIORef mempty)
liftIO $
runExceptT $
runReaderT
(unExecute (action >> getFinalRecordSet headAndTail))
(ExecuteReader {credentials, recordSets})
-- | Execute the forest of actions.
execute :: Forest PlannedAction -> Execute ()
execute = traverse_ (traverse_ executePlannedAction)
-- | Execute an action, then store its result in the ref assigned to it.
executePlannedAction :: PlannedAction -> Execute ()
executePlannedAction PlannedAction {ref, action} =
fetchRecordSetForAction action >>= saveRecordSet ref
-- | Fetch the record set for the given action.
fetchRecordSetForAction :: Action -> Execute RecordSet
fetchRecordSetForAction =
\case
SelectAction select -> do
recordSet <- do
SourceConfig {scConnectionPool} <- asks credentials
result <-
liftIO $
runExceptT $
runQueryYieldingRows
scConnectionPool
(toQueryFlat (fromSelect (selectQuery select)))
case result of
Left problem -> throwError (JoinProblem problem)
Right rows -> pure (makeRecordSet rows)
-- Update the wanted fields from the original select. This lets
-- the executor know which fields to include after performing a
-- join.
pure recordSet {wantedFields = Plan.selectWantedFields select}
JoinAction Plan.Join {joinType = joinType', joinFieldName = fieldName, ..} -> do
left <- getRecordSet leftRecordSet
right <- getRecordSet rightRecordSet
case joinType' of
ArrayJoin fields ->
leftArrayJoin
wantedFields
fieldName
(toFieldNames fields)
joinRhsTop
joinRhsOffset
left
right
`onLeft` (throwError . JoinProblem)
ObjectJoin fields ->
leftObjectJoin
wantedFields
fieldName
(toFieldNames fields)
left
right
`onLeft` (throwError . JoinProblem)
_ -> throwError (UnsupportedJoinBug joinType')
where
toFieldNames = fmap (bimap toFieldName toFieldName)
-- | Make a record set from a flat record from the DB.
makeRecordSet :: Vector (InsOrdHashMap FieldName J.Value) -> RecordSet
makeRecordSet rows =
RecordSet
{ origin = Nothing, -- No information for this yet, but will follow
-- up with a change for this later.
rows = fmap (fmap ScalarOutputValue) rows,
wantedFields = Nothing
}
saveRecordSet :: Ref -> RecordSet -> Execute ()
saveRecordSet ref recordSet = do
recordSetsRef <- asks recordSets
liftIO (modifyIORef' recordSetsRef (OMap.insert ref recordSet))
getRecordSet :: Ref -> Execute RecordSet
getRecordSet ref = do
recordSetsRef <- asks recordSets
hash <- liftIO (readIORef recordSetsRef)
OMap.lookup ref hash `onNothing` throwError (MissingRecordSetBug ref)
-- | See documentation for 'HeadAndTail'.
getFinalRecordSet :: HeadAndTail -> Execute RecordSet
getFinalRecordSet HeadAndTail {..} = do
headSet <- getRecordSet head
tailSet <-
if tail /= head
then getRecordSet tail
else pure headSet
pure
tailSet
{ rows =
fmap
( OMap.filterWithKey
( \(FieldName k) _ ->
maybe True (elem k) (wantedFields headSet)
)
)
(rows tailSet)
}
{- WIP, it seems:
-- | Make an lhs_fk IN (rhs_fk1, rhs_fk2, ..) expression list.
makeRelationshipIn :: DataLoaderPlan.Relationship -> Execute [Expression]
makeRelationshipIn
DataLoaderPlan.Relationship
{ leftRecordSet,
joinType = _,
rightTable = _rightTable
} = do
RecordSet {rows = _rows} <- getRecordSet leftRecordSet
-- TODO: A follow-up PR will add IN(..) and will join on the join
-- fields for the left/right tables. It needs support from Types.hs.
pure []
where
_lookupField' k row =
case OMap.lookup k row of
Nothing -> Nothing
Just x -> Just x
-- | Will be used by makeRelationshipIn for forming lhs_fk IN (rhs_fk1, rhs_fk2, ..)
planFieldNameToQueryFieldName :: EntityAlias -> FieldName -> MySQL.FieldName
planFieldNameToQueryFieldName (EntityAlias fieldNameEntity) (FieldName fieldName) =
MySQL.FieldName {fNameEntity = fieldNameEntity, fName = fieldName}
-}
-- | Inefficient but clean left object join.
leftObjectJoin ::
Maybe [Text] ->
Text ->
[(DataLoaderPlan.FieldName, DataLoaderPlan.FieldName)] ->
RecordSet ->
RecordSet ->
Either ExecuteProblem RecordSet
leftObjectJoin wantedFields joinAlias joinFields left right = do
rows' <- fmap V.fromList . traverse makeRows . toList $ rows left
pure
RecordSet
{ origin = Nothing,
wantedFields = Nothing,
rows = rows'
}
where
makeRows :: InsOrdHashMap FieldName OutputValue -> Either ExecuteProblem (InsOrdHashMap FieldName OutputValue)
makeRows leftRow =
let rightRows =
V.fromList
[ rightRow
| not (null joinFields),
rightRow <- toList (rows right),
all
( \(rightField, leftField) ->
Just True
== ( do
leftValue <- OMap.lookup leftField leftRow
rightValue <- OMap.lookup rightField rightRow
pure (leftValue == rightValue)
)
)
joinFields
]
in -- The line below will return Left is rightRows has more than one element.
-- Consider moving the check here if it makes sense in the future.
joinObjectRows wantedFields joinAlias leftRow rightRows
-- | A naive, exponential reference implementation of a left join. It
-- serves as a trivial sample implementation for correctness checking
-- of more efficient ones.
leftArrayJoin ::
Maybe [Text] ->
Text ->
[(DataLoaderPlan.FieldName, DataLoaderPlan.FieldName)] ->
Top ->
Maybe Int ->
RecordSet ->
RecordSet ->
Either ExecuteProblem RecordSet
leftArrayJoin wantedFields joinAlias joinFields rhsTop rhsOffset left right =
pure
RecordSet
{ origin = Nothing,
wantedFields = Nothing,
rows =
V.fromList
[ joinArrayRows wantedFields joinAlias leftRow rightRows
| leftRow <- toList (rows left),
let rightRows =
V.fromList
( limit
( offset
[ rightRow
| not (null joinFields),
rightRow <- toList (rows right),
all
( \(rightField, leftField) ->
Just True
== ( do
leftValue <- OMap.lookup leftField leftRow
rightValue <- OMap.lookup rightField rightRow
pure (leftValue == rightValue)
)
)
joinFields
]
)
)
]
}
where
offset = maybe id drop rhsOffset
limit =
case rhsTop of
NoTop -> id
Top n -> take n
-- | Join a row with another as an array join.
joinArrayRows ::
Maybe [Text] ->
Text ->
InsOrdHashMap DataLoaderPlan.FieldName OutputValue ->
Vector (InsOrdHashMap DataLoaderPlan.FieldName OutputValue) ->
InsOrdHashMap DataLoaderPlan.FieldName OutputValue
joinArrayRows wantedFields fieldName leftRow rightRow =
OMap.insert
(DataLoaderPlan.FieldName fieldName)
( ArrayOutputValue
( fmap
( RecordOutputValue
. OMap.filterWithKey
( \(DataLoaderPlan.FieldName k) _ ->
maybe True (elem k) wantedFields
)
)
rightRow
)
)
leftRow
-- | Join a row with another as an object join.
--
-- If rightRow is not a single row, we throw 'BrokenJoinInvariant'.
joinObjectRows ::
Maybe [Text] ->
Text ->
InsOrdHashMap DataLoaderPlan.FieldName OutputValue ->
Vector (InsOrdHashMap DataLoaderPlan.FieldName OutputValue) ->
Either ExecuteProblem (InsOrdHashMap DataLoaderPlan.FieldName OutputValue)
joinObjectRows wantedFields fieldName leftRow rightRows
| V.length rightRows /= 1 = Left . BrokenJoinInvariant . foldMap OMap.keys $ rightRows
| otherwise =
let row = V.head rightRows
in pure $
OMap.insert
(DataLoaderPlan.FieldName fieldName)
( RecordOutputValue
( OMap.filterWithKey
(\(DataLoaderPlan.FieldName k) _ -> maybe True (elem k) wantedFields)
row
)
)
leftRow