{-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE ExtendedDefaultRules #-} {-# OPTIONS_GHC -fno-warn-type-defaults #-} -- | Execute a Select query against the BigQuery REST API. module Hasura.Backends.BigQuery.Execute ( executeSelect, runExecute, streamBigQuery, BigQuery (..), OutputValue (..), RecordSet (..), Execute, Value (..), FieldNameText (..), ) where import Control.Applicative import Control.Concurrent import Control.Exception.Safe import Control.Monad.Except import Control.Monad.Reader import Data.Aeson ((.!=), (.:), (.:?), (.=)) import Data.Aeson qualified as Aeson import Data.Aeson.Types qualified as Aeson import Data.ByteString.Lazy qualified as L import Data.Foldable import Data.HashMap.Strict.InsOrd qualified as OMap import Data.Maybe import Data.Text qualified as T import Data.Text.Lazy qualified as LT import Data.Text.Lazy.Builder qualified as LT import Data.Vector (Vector) import Data.Vector qualified as V import GHC.Generics import Hasura.Backends.BigQuery.Connection import Hasura.Backends.BigQuery.Source import Hasura.Backends.BigQuery.ToQuery qualified as ToQuery import Hasura.Backends.BigQuery.Types as BigQuery import Hasura.Prelude hiding (head, state, tail) import Network.HTTP.Simple import Network.HTTP.Types -------------------------------------------------------------------------------- -- Types -- | A set of records produced by the database. These are joined -- together. There are all sorts of optimizations possible here, from -- using a matrix/flat vector, unboxed sums for Value, etc. Presently -- we choose a naive implementation in the interest of getting other -- work done. data RecordSet = RecordSet { rows :: !(Vector (InsOrdHashMap FieldNameText OutputValue)), wantedFields :: !(Maybe [Text]) } deriving (Show) -- | As opposed to BigQuery.FieldName which is a qualified name, this -- is just the unqualified text name itself. newtype FieldNameText = FieldNameText Text deriving (Show, Ord, Eq, Hashable, Aeson.FromJSON, Aeson.ToJSONKey, IsString) data OutputValue = DecimalOutputValue !Decimal | BigDecimalOutputValue !BigDecimal | IntegerOutputValue !Int64 | FloatOutputValue !Float64 | GeographyOutputValue !Geography | TextOutputValue !Text | TimestampOutputValue !Timestamp | DateOutputValue !Date | TimeOutputValue !Time | DatetimeOutputValue !Datetime | BytesOutputValue !Base64 | BoolOutputValue !Bool | ArrayOutputValue !(Vector OutputValue) | RecordOutputValue !(InsOrdHashMap FieldNameText OutputValue) | NullOutputValue -- TODO: Consider implications. deriving (Show, Eq, Generic) instance Hashable OutputValue instance Aeson.ToJSON OutputValue where toJSON = \case NullOutputValue -> Aeson.toJSON Aeson.Null DecimalOutputValue !i -> Aeson.toJSON i BigDecimalOutputValue !i -> Aeson.toJSON i FloatOutputValue !i -> Aeson.toJSON i TextOutputValue !i -> Aeson.toJSON i BytesOutputValue !i -> Aeson.toJSON i DateOutputValue !i -> Aeson.toJSON i TimestampOutputValue !i -> Aeson.toJSON i TimeOutputValue !i -> Aeson.toJSON i DatetimeOutputValue !i -> Aeson.toJSON i GeographyOutputValue !i -> Aeson.toJSON i BoolOutputValue !i -> Aeson.toJSON i IntegerOutputValue !i -> Aeson.toJSON i ArrayOutputValue !vector -> Aeson.toJSON vector RecordOutputValue !record -> Aeson.toJSON record data ExecuteReader = ExecuteReader { credentials :: !BigQuerySourceConfig } data ExecuteProblem = GetJobDecodeProblem String | CreateQueryJobDecodeProblem String | ErrorResponseFromServer Status L.ByteString | GetJobResultsProblem SomeException | RESTRequestNonOK Status Text | CreateQueryJobProblem SomeException | ExecuteRunBigQueryProblem BigQueryProblem deriving (Show) -- | Execute monad; as queries are performed, the record sets are -- stored in the map. newtype Execute a = Execute { unExecute :: ReaderT ExecuteReader (ExceptT ExecuteProblem IO) a } deriving ( Functor, Applicative, Monad, MonadReader ExecuteReader, MonadIO, MonadError ExecuteProblem ) -- | Big query parameters must be accompanied by an explicit type -- signature. data BigQueryType = DECIMAL | INTEGER | FLOAT | BYTES | STRING | BOOL | ARRAY BigQueryType | GEOGRAPHY | DATE | TIMESTAMP | DATETIME | TIME | BIGDECIMAL deriving (Show, Eq) data BigQuery = BigQuery { query :: !LT.Text, parameters :: !(InsOrdHashMap ParameterName Parameter), cardinality :: BigQuery.Cardinality } deriving (Show) data Parameter = Parameter { typ :: !BigQueryType, value :: !Value } deriving (Show) newtype ParameterName = ParameterName LT.Text deriving (Show, Aeson.ToJSON, Ord, Eq, Hashable) data BigQueryField = BigQueryField { name :: !FieldNameText, typ :: !BigQueryFieldType, mode :: !Mode } deriving (Show) data BigQueryFieldType = FieldSTRING | FieldBYTES | FieldINTEGER | FieldFLOAT | FieldBOOL | FieldTIMESTAMP | FieldDATE | FieldTIME | FieldDATETIME | FieldGEOGRAPHY | FieldDECIMAL | FieldBIGDECIMAL | FieldSTRUCT (Vector BigQueryField) deriving (Show) data Mode = Nullable | NotNullable | Repeated deriving (Show) data IsNullable = IsNullable | IsRequired -------------------------------------------------------------------------------- -- Constants -- | Delay between attempts to get job results if the job is incomplete. streamDelaySeconds :: Int streamDelaySeconds = 1 -------------------------------------------------------------------------------- -- Executing the planned actions forest runExecute :: MonadIO m => BigQuerySourceConfig -> Execute RecordSet -> m (Either ExecuteProblem RecordSet) runExecute credentials m = liftIO ( runExceptT ( runReaderT (unExecute (m >>= getFinalRecordSet)) (ExecuteReader {credentials}) ) ) executeSelect :: Select -> Execute RecordSet executeSelect select = do credentials <- asks credentials recordSet <- streamBigQuery credentials (selectToBigQuery select) >>= liftEither pure recordSet {wantedFields = selectFinalWantedFields select} -- | This is needed to strip out unneeded fields (join keys) in the -- final query. This is a relic of the data loader approach. A later -- improvement would be to update the FromIr code to explicitly -- reselect the query. But the purpose of this commit is to drop the -- dataloader code and not modify the from IR code which is more -- delicate. getFinalRecordSet :: RecordSet -> Execute RecordSet getFinalRecordSet recordSet = pure recordSet { rows = fmap ( OMap.filterWithKey ( \(FieldNameText k) _ -> maybe True (elem k) (wantedFields recordSet) ) ) (rows recordSet) } -------------------------------------------------------------------------------- -- Make a big query from a select selectToBigQuery :: Select -> BigQuery selectToBigQuery select = BigQuery { query = LT.toLazyText query, parameters = OMap.fromList ( map ( \(int, value) -> ( ParameterName (LT.toLazyText (ToQuery.paramName int)), Parameter {typ = valueType value, value} ) ) (OMap.toList params) ), cardinality = selectCardinality select } where (query, params) = ToQuery.renderBuilderPretty (ToQuery.fromSelect select) -------------------------------------------------------------------------------- -- Type system -- | Make a BigQuery type for the given value. valueType :: Value -> BigQueryType valueType = \case DecimalValue {} -> DECIMAL BigDecimalValue {} -> BIGDECIMAL IntegerValue {} -> INTEGER FloatValue {} -> FLOAT GeographyValue {} -> GEOGRAPHY StringValue {} -> STRING BytesValue {} -> BYTES BoolValue {} -> BOOL DatetimeValue {} -> DATETIME TimeValue {} -> TIME DateValue {} -> DATE TimestampValue {} -> TIMESTAMP ArrayValue values -> ARRAY ( maybe STRING -- Above: If the array is null, it doesn't matter what type -- the element is. So we put STRING. valueType (values V.!? 0) -- Above: We base the type from the first element. Later, -- we could add some kind of sanity check that they are all -- the same type. ) NullValue -> STRING -- Above: If the value is null, it doesn't matter what type -- the element is. So we put STRING. -------------------------------------------------------------------------------- -- JSON serialization -- | Make a JSON representation of the type of the given value. valueToBigQueryJson :: Value -> Aeson.Value valueToBigQueryJson = go where go = \case NullValue -> Aeson.Null -- TODO: I haven't tested whether BigQuery is happy with this null value. DecimalValue i -> Aeson.object ["value" .= i] BigDecimalValue i -> Aeson.object ["value" .= i] IntegerValue i -> Aeson.object ["value" .= i] FloatValue i -> Aeson.object ["value" .= i] TimestampValue i -> Aeson.object ["value" .= i] DateValue (Date i) -> Aeson.object ["value" .= i] TimeValue (Time i) -> Aeson.object ["value" .= i] DatetimeValue (Datetime i) -> Aeson.object ["value" .= i] GeographyValue (Geography i) -> Aeson.object ["value" .= i] StringValue i -> Aeson.object ["value" .= Aeson.String i] BytesValue i -> Aeson.object ["value" .= i] BoolValue i -> Aeson.object [ "value" .= Aeson.String ( if i then "true" else "false" ) ] ArrayValue vs -> Aeson.object ["array_values" .= Aeson.Array (fmap go vs)] -------------------------------------------------------------------------------- -- Execute a query as a job and stream the results into a record set -- | TODO: WARNING: This function hasn't been tested on Big Data(tm), -- and therefore I was unable to get BigQuery to produce paginated -- results that would contain the 'pageToken' field in the JSON -- response. Until that test has been done, we should consider this a -- preliminary implementation. streamBigQuery :: MonadIO m => BigQuerySourceConfig -> BigQuery -> m (Either ExecuteProblem RecordSet) streamBigQuery credentials bigquery = do jobResult <- createQueryJob credentials bigquery case jobResult of Right job -> loop Nothing Nothing where loop pageToken mrecordSet = do results <- getJobResults credentials job Fetch {pageToken} case results of Left problem -> pure (Left problem) Right ( JobComplete JobResults { pageToken = mpageToken', recordSet = recordSet'@RecordSet {rows = rows'} } ) -> do let extendedRecordSet = case mrecordSet of Nothing -> recordSet' Just recordSet@RecordSet {rows} -> (recordSet {rows = rows <> rows'}) case mpageToken' of Nothing -> pure (Right extendedRecordSet) Just pageToken' -> loop (pure pageToken') (pure extendedRecordSet) Right JobIncomplete {} -> do liftIO (threadDelay (1000 * 1000 * streamDelaySeconds)) loop pageToken mrecordSet Left e -> pure (Left e) -------------------------------------------------------------------------------- -- Querying results from a job data JobResults = JobResults { pageToken :: Maybe Text, recordSet :: RecordSet } deriving (Show) instance Aeson.FromJSON JobResults where parseJSON = Aeson.withObject "JobResults" ( \o -> do recordSet <- parseRecordSetPayload o pageToken <- fmap ( \mtoken -> do token <- mtoken guard (not (T.null token)) pure token ) (o .:? "pageToken") pure JobResults {..} ) data JobResultsResponse = JobIncomplete | JobComplete JobResults deriving (Show) instance Aeson.FromJSON JobResultsResponse where parseJSON j = Aeson.withObject "JobResultsResponse" ( \o -> do kind <- o .: "kind" if kind == ("bigquery#getQueryResultsResponse" :: Text) then do complete <- o .: "jobComplete" if complete then fmap JobComplete (Aeson.parseJSON j) else pure JobIncomplete else fail ("Invalid kind: " <> show kind) ) j data Fetch = Fetch { pageToken :: Maybe Text } deriving (Show) -- | Get results of a job. getJobResults :: MonadIO m => BigQuerySourceConfig -> Job -> Fetch -> m (Either ExecuteProblem JobResultsResponse) getJobResults sc@BigQuerySourceConfig {..} Job {jobId, location} Fetch {pageToken} = liftIO (catchAny run (pure . Left . GetJobResultsProblem)) where -- https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get#query-parameters url = "GET https://bigquery.googleapis.com/bigquery/v2/projects/" <> T.unpack _scProjectId <> "/queries/" <> T.unpack jobId <> "?alt=json&prettyPrint=false" <> "&location=" <> T.unpack location <> "&" <> T.unpack (encodeParams extraParameters) run = do let req = setRequestHeader "Content-Type" ["application/json"] $ parseRequest_ url eResp <- runBigQuery sc req case eResp of Left e -> pure (Left (ExecuteRunBigQueryProblem e)) Right resp -> case getResponseStatusCode resp of 200 -> case Aeson.eitherDecode (getResponseBody resp) of Left e -> pure (Left (GetJobDecodeProblem e)) Right results -> pure (Right results) _ -> do pure $ Left $ RESTRequestNonOK (getResponseStatus resp) $ lbsToTxt $ getResponseBody resp extraParameters = pageTokenParam where pageTokenParam = case pageToken of Nothing -> [] Just token -> [("pageToken", token)] encodeParams = T.intercalate "&" . map (\(k, v) -> k <> "=" <> v) -------------------------------------------------------------------------------- -- Creating jobs data Job = Job { state :: !Text, jobId :: !Text, location :: !Text } deriving (Show) instance Aeson.FromJSON Job where parseJSON = Aeson.withObject "Job" ( \o -> do kind <- o .: "kind" if kind == ("bigquery#job" :: Text) then do state <- do status <- o .: "status" status .: "state" (jobId, location) <- do ref <- o .: "jobReference" -- 'location' is needed in addition to 'jobId' to query a job's -- status (,) <$> ref .: "jobId" <*> ref .: "location" pure Job {state, jobId, location} else fail ("Invalid kind: " <> show kind) ) -- | Create a job asynchronously. createQueryJob :: MonadIO m => BigQuerySourceConfig -> BigQuery -> m (Either ExecuteProblem Job) createQueryJob sc@BigQuerySourceConfig {..} BigQuery {..} = liftIO ( do -- putStrLn (LT.unpack query) catchAny run (pure . Left . CreateQueryJobProblem) ) where run = do let url = "POST https://content-bigquery.googleapis.com/bigquery/v2/projects/" <> T.unpack _scProjectId <> "/jobs?alt=json&prettyPrint=false" let req = setRequestHeader "Content-Type" ["application/json"] $ setRequestBodyLBS body $ parseRequest_ url eResp <- runBigQuery sc req case eResp of Left e -> pure (Left (ExecuteRunBigQueryProblem e)) Right resp -> case getResponseStatusCode resp of 200 -> case Aeson.eitherDecode (getResponseBody resp) of Left e -> pure (Left (CreateQueryJobDecodeProblem e)) Right job -> pure (Right job) _ -> do pure $ Left $ RESTRequestNonOK (getResponseStatus resp) $ lbsToTxt $ getResponseBody resp body = Aeson.encode ( Aeson.object [ "configuration" .= Aeson.object [ "jobType" .= "QUERY", "query" .= Aeson.object [ "query" .= query, "useLegacySql" .= False, -- Important, it makes `quotes` work properly. "parameterMode" .= "NAMED", "queryParameters" .= map ( \(name, Parameter {..}) -> Aeson.object [ "name" .= Aeson.toJSON name, "parameterType" .= Aeson.toJSON typ, "parameterValue" .= valueToBigQueryJson value ] ) (OMap.toList parameters) ] ] ] ) -------------------------------------------------------------------------------- -- Consuming recordset from big query parseRecordSetPayload :: Aeson.Object -> Aeson.Parser RecordSet parseRecordSetPayload resp = do mSchema <- resp .:? "schema" columns <- maybe (pure V.empty) (.: "fields") mSchema :: Aeson.Parser (Vector BigQueryField) rowsJSON <- fmap (fromMaybe V.empty) (resp .:? "rows" :: Aeson.Parser (Maybe (Vector Aeson.Value))) rows <- V.imapM (\i row -> parseRow columns row Aeson. Aeson.Index i) rowsJSON Aeson. Aeson.Key "rows" pure RecordSet {wantedFields = Nothing, rows} -------------------------------------------------------------------------------- -- Schema-driven JSON deserialization parseRow :: Vector BigQueryField -> Aeson.Value -> Aeson.Parser (InsOrdHashMap FieldNameText OutputValue) parseRow columnTypes value = do result <- parseBigQueryRow columnTypes value case result of RecordOutputValue row -> pure row _ -> fail ("Expected a record when parsing a top-level row: " ++ show value) -- | Parse a row, which at the top-level of the "rows" output has no -- {"v":..} wrapper. But when appearing nestedly, does have the -- wrapper. See 'parseBigQueryValue'. parseBigQueryRow :: Vector BigQueryField -> Aeson.Value -> Aeson.Parser OutputValue parseBigQueryRow columnTypes = Aeson.withObject "RECORD" ( \o -> do fields <- o .: "f" Aeson. Aeson.Key "RECORD" values <- sequence ( V.izipWith ( \i typ field -> parseBigQueryField typ field Aeson. Aeson.Index i ) columnTypes fields ) Aeson. Aeson.Key "f" pure (RecordOutputValue (OMap.fromList (V.toList values))) ) parseBigQueryValue :: IsNullable -> BigQueryFieldType -> Aeson.Value -> Aeson.Parser OutputValue parseBigQueryValue isNullable fieldType object = case fieldType of FieldSTRUCT types -> has_v isNullable (parseBigQueryRow types) object Aeson. Aeson.Key "RECORD" FieldDECIMAL -> has_v isNullable (fmap DecimalOutputValue . Aeson.parseJSON) object Aeson. Aeson.Key "DECIMAL" FieldBIGDECIMAL -> has_v isNullable (fmap BigDecimalOutputValue . Aeson.parseJSON) object Aeson. Aeson.Key "BIGDECIMAL" FieldINTEGER -> has_v isNullable (fmap IntegerOutputValue . Aeson.parseJSON) object Aeson. Aeson.Key "INTEGER" FieldDATE -> has_v isNullable (fmap DateOutputValue . Aeson.parseJSON) object Aeson. Aeson.Key "DATE" FieldTIME -> has_v isNullable (fmap TimeOutputValue . Aeson.parseJSON) object Aeson. Aeson.Key "TIME" FieldDATETIME -> has_v isNullable (fmap DatetimeOutputValue . Aeson.parseJSON) object Aeson. Aeson.Key "DATETIME" FieldTIMESTAMP -> has_v isNullable (fmap (TimestampOutputValue . Timestamp . utctimeToISO8601Text) . Aeson.withText "FieldTIMESTAMP" textToUTCTime) object Aeson. Aeson.Key "TIMESTAMP" FieldGEOGRAPHY -> has_v isNullable (fmap GeographyOutputValue . Aeson.parseJSON) object Aeson. Aeson.Key "GEOGRAPHY" FieldFLOAT -> has_v isNullable (fmap FloatOutputValue . Aeson.parseJSON) object Aeson. Aeson.Key "FLOAT" FieldBOOL -> has_v isNullable (fmap (BoolOutputValue . (== "true")) . Aeson.parseJSON) object Aeson. Aeson.Key "BOOL" FieldSTRING -> has_v isNullable (fmap TextOutputValue . Aeson.parseJSON) object Aeson. Aeson.Key "STRING" FieldBYTES -> has_v isNullable (fmap BytesOutputValue . Aeson.parseJSON) object Aeson. Aeson.Key "BYTES" parseBigQueryField :: BigQueryField -> Aeson.Value -> Aeson.Parser (FieldNameText, OutputValue) parseBigQueryField BigQueryField {name, typ, mode} value1 = case mode of Repeated -> ( do values <- has_v_generic Aeson.parseJSON value1 outputs <- V.imapM ( \i value2 -> parseBigQueryValue IsRequired typ value2 Aeson. Aeson.Index i ) values pure (name, ArrayOutputValue outputs) ) Aeson. Aeson.Key "REPEATED" Nullable -> do output <- parseBigQueryValue IsNullable typ value1 Aeson. Aeson.Key "NULLABLE" pure (name, output) NotNullable -> do output <- parseBigQueryValue IsRequired typ value1 Aeson. Aeson.Key "REQUIRED" pure (name, output) -- Every value, after the top-level row, is wrapped in this. has_v :: IsNullable -> (Aeson.Value -> Aeson.Parser OutputValue) -> Aeson.Value -> Aeson.Parser OutputValue has_v isNullable f = Aeson.withObject "HAS_V" ( \o -> o .: "v" >>= \v -> case v of Aeson.Null | IsNullable <- isNullable -> pure NullOutputValue _ -> f v Aeson. Aeson.Key "v" ) -- Every value, after the top-level row, is wrapped in this. has_v_generic :: (Aeson.Value -> Aeson.Parser a) -> Aeson.Value -> Aeson.Parser a has_v_generic f = Aeson.withObject "HAS_V" (\o -> o .: "v" >>= \v -> (f v Aeson. Aeson.Key "v")) -------------------------------------------------------------------------------- -- Generic JSON deserialization instance Aeson.ToJSON BigQueryType where toJSON = \case ARRAY t -> Aeson.object ["type" .= ("ARRAY" :: Text), "arrayType" .= t] DECIMAL -> atomic "NUMERIC" BIGDECIMAL -> atomic "BIGNUMERIC" INTEGER -> atomic "INTEGER" DATE -> atomic "DATE" TIME -> atomic "TIME" DATETIME -> atomic "DATETIME" TIMESTAMP -> atomic "TIMESTAMP" FLOAT -> atomic "FLOAT" GEOGRAPHY -> atomic "GEOGRAPHY" STRING -> atomic "STRING" BYTES -> atomic "BYTES" BOOL -> atomic "BOOL" where atomic ty = Aeson.object ["type" .= (ty :: Text)] instance Aeson.FromJSON BigQueryField where parseJSON = Aeson.withObject "BigQueryField" ( \o -> do name <- o .: "name" typ <- do flag :: Text <- o .: "type" if | flag == "NUMERIC" || flag == "DECIMAL" -> pure FieldDECIMAL | flag == "BIGNUMERIC" || flag == "BIGDECIMAL" -> pure FieldBIGDECIMAL | flag == "INT64" || flag == "INTEGER" -> pure FieldINTEGER | flag == "FLOAT64" || flag == "FLOAT" -> pure FieldFLOAT | flag == "BOOLEAN" || flag == "BOOL" -> pure FieldBOOL | flag == "STRING" -> pure FieldSTRING | flag == "DATE" -> pure FieldDATE | flag == "TIME" -> pure FieldTIME | flag == "DATETIME" -> pure FieldDATETIME | flag == "TIMESTAMP" -> pure FieldTIMESTAMP | flag == "GEOGRAPHY" -> pure FieldGEOGRAPHY | flag == "BYTES" -> pure FieldBYTES | flag == "RECORD" || flag == "STRUCT" -> do fields <- o .: "fields" pure (FieldSTRUCT fields) | otherwise -> fail ("Unsupported field type: " ++ show flag) mode <- o .:? "mode" .!= Nullable pure BigQueryField {..} ) instance Aeson.FromJSON Mode where parseJSON j = do s <- Aeson.parseJSON j case s :: Text of "NULLABLE" -> pure Nullable "REPEATED" -> pure Repeated _ -> pure NotNullable