mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-20 22:11:45 +03:00
5eb72e202b
https://github.com/hasura/graphql-engine-mono/pull/1847 GitOrigin-RevId: ed6c5e0d3caae27f32b97d563873720df77b017a
717 lines
25 KiB
Haskell
717 lines
25 KiB
Haskell
{-# OPTIONS_GHC -fno-warn-type-defaults #-}
|
|
{-# LANGUAGE DuplicateRecordFields #-}
|
|
{-# LANGUAGE ExtendedDefaultRules #-}
|
|
|
|
-- | Execute a Select query against the BigQuery REST API.
|
|
|
|
module Hasura.Backends.BigQuery.Execute
|
|
( executeSelect
|
|
, runExecute
|
|
, streamBigQuery
|
|
, BigQuery(..)
|
|
, OutputValue(..)
|
|
, RecordSet(..)
|
|
, Execute
|
|
, Value(..)
|
|
, FieldNameText(..)
|
|
) where
|
|
|
|
import Control.Applicative
|
|
import Control.Concurrent
|
|
import Control.Exception.Safe
|
|
import Control.Monad.Except
|
|
import Control.Monad.Reader
|
|
import Data.Aeson ((.:), (.:?), (.=))
|
|
import qualified Data.Aeson as Aeson
|
|
import qualified Data.Aeson.Types as Aeson
|
|
import qualified Data.ByteString.Lazy as L
|
|
import Data.Foldable
|
|
import qualified Data.HashMap.Strict.InsOrd as OMap
|
|
import Data.Maybe
|
|
import qualified Data.Text as T
|
|
import qualified Data.Text.Lazy as LT
|
|
import qualified Data.Text.Lazy.Builder as LT
|
|
import Data.Vector (Vector)
|
|
import qualified Data.Vector as V
|
|
import GHC.Generics
|
|
import Hasura.Backends.BigQuery.Connection
|
|
import Hasura.Backends.BigQuery.Source
|
|
import qualified Hasura.Backends.BigQuery.ToQuery as ToQuery
|
|
import Hasura.Backends.BigQuery.Types as BigQuery
|
|
import Hasura.Prelude hiding (head, state, tail)
|
|
import Network.HTTP.Simple
|
|
import Network.HTTP.Types
|
|
|
|
--------------------------------------------------------------------------------
|
|
-- Types
|
|
|
|
-- | A set of records produced by the database. These are joined
|
|
-- together. There are all sorts of optimizations possible here, from
|
|
-- using a matrix/flat vector, unboxed sums for Value, etc. Presently
|
|
-- we choose a naive implementation in the interest of getting other
|
|
-- work done.
|
|
data RecordSet = RecordSet
|
|
{ rows :: !(Vector (InsOrdHashMap FieldNameText OutputValue))
|
|
, wantedFields :: !(Maybe [Text])
|
|
} deriving (Show)
|
|
|
|
-- | As opposed to BigQuery.FieldName which is a qualified name, this
|
|
-- is just the unqualified text name itself.
|
|
newtype FieldNameText =
|
|
FieldNameText Text
|
|
deriving (Show, Ord, Eq, Hashable, Aeson.FromJSON, Aeson.ToJSONKey, IsString)
|
|
|
|
data OutputValue
|
|
= DecimalOutputValue !Decimal
|
|
| BigDecimalOutputValue !BigDecimal
|
|
| IntegerOutputValue !Int64
|
|
| FloatOutputValue !Float64
|
|
| GeographyOutputValue !Geography
|
|
| TextOutputValue !Text
|
|
| TimestampOutputValue !Timestamp
|
|
| DateOutputValue !Date
|
|
| TimeOutputValue !Time
|
|
| DatetimeOutputValue !Datetime
|
|
| BytesOutputValue !Base64
|
|
| BoolOutputValue !Bool
|
|
| ArrayOutputValue !(Vector OutputValue)
|
|
| RecordOutputValue !(InsOrdHashMap FieldNameText OutputValue)
|
|
| NullOutputValue -- TODO: Consider implications.
|
|
deriving (Show, Eq, Generic)
|
|
instance Hashable OutputValue
|
|
instance Aeson.ToJSON OutputValue where
|
|
toJSON = \case
|
|
NullOutputValue -> Aeson.toJSON Aeson.Null
|
|
DecimalOutputValue !i -> Aeson.toJSON i
|
|
BigDecimalOutputValue !i -> Aeson.toJSON i
|
|
FloatOutputValue !i -> Aeson.toJSON i
|
|
TextOutputValue !i -> Aeson.toJSON i
|
|
BytesOutputValue !i -> Aeson.toJSON i
|
|
DateOutputValue !i -> Aeson.toJSON i
|
|
TimestampOutputValue !i -> Aeson.toJSON i
|
|
TimeOutputValue !i -> Aeson.toJSON i
|
|
DatetimeOutputValue !i -> Aeson.toJSON i
|
|
GeographyOutputValue !i -> Aeson.toJSON i
|
|
BoolOutputValue !i -> Aeson.toJSON i
|
|
IntegerOutputValue !i -> Aeson.toJSON i
|
|
ArrayOutputValue !vector -> Aeson.toJSON vector
|
|
RecordOutputValue !record -> Aeson.toJSON record
|
|
|
|
data ExecuteReader = ExecuteReader
|
|
{ credentials :: !BigQuerySourceConfig
|
|
}
|
|
|
|
data ExecuteProblem
|
|
= GetJobDecodeProblem String
|
|
| CreateQueryJobDecodeProblem String
|
|
| ErrorResponseFromServer Status L.ByteString
|
|
| GetJobResultsProblem SomeException
|
|
| RESTRequestNonOK Status Text
|
|
| CreateQueryJobProblem SomeException
|
|
| ExecuteRunBigQueryProblem BigQueryProblem
|
|
deriving (Show)
|
|
|
|
-- | Execute monad; as queries are performed, the record sets are
|
|
-- stored in the map.
|
|
newtype Execute a = Execute
|
|
{ unExecute :: ReaderT ExecuteReader (ExceptT ExecuteProblem IO) a
|
|
} deriving ( Functor
|
|
, Applicative
|
|
, Monad
|
|
, MonadReader ExecuteReader
|
|
, MonadIO
|
|
, MonadError ExecuteProblem
|
|
)
|
|
|
|
-- | Big query parameters must be accompanied by an explicit type
|
|
-- signature.
|
|
data BigQueryType
|
|
= DECIMAL
|
|
| INTEGER
|
|
| FLOAT
|
|
| BYTES
|
|
| STRING
|
|
| BOOL
|
|
| ARRAY BigQueryType
|
|
| GEOGRAPHY
|
|
| DATE
|
|
| TIMESTAMP
|
|
| DATETIME
|
|
| TIME
|
|
| BIGDECIMAL
|
|
deriving (Show, Eq)
|
|
|
|
data BigQuery = BigQuery
|
|
{ query :: !LT.Text
|
|
, parameters :: !(InsOrdHashMap ParameterName Parameter)
|
|
, cardinality :: BigQuery.Cardinality
|
|
} deriving (Show)
|
|
|
|
data Parameter = Parameter
|
|
{ typ :: !BigQueryType
|
|
, value :: !Value
|
|
} deriving (Show)
|
|
|
|
newtype ParameterName =
|
|
ParameterName LT.Text deriving (Show, Aeson.ToJSON, Ord, Eq, Hashable)
|
|
|
|
data BigQueryField = BigQueryField
|
|
{ name :: !FieldNameText
|
|
, typ :: !BigQueryFieldType
|
|
, mode :: !Mode
|
|
} deriving (Show)
|
|
|
|
data BigQueryFieldType
|
|
= FieldSTRING
|
|
| FieldBYTES
|
|
| FieldINTEGER
|
|
| FieldFLOAT
|
|
| FieldBOOL
|
|
| FieldTIMESTAMP
|
|
| FieldDATE
|
|
| FieldTIME
|
|
| FieldDATETIME
|
|
| FieldGEOGRAPHY
|
|
| FieldDECIMAL
|
|
| FieldBIGDECIMAL
|
|
| FieldSTRUCT (Vector BigQueryField)
|
|
deriving (Show)
|
|
|
|
data Mode
|
|
= Nullable
|
|
| NotNullable
|
|
| Repeated
|
|
deriving (Show)
|
|
|
|
data IsNullable
|
|
= IsNullable
|
|
| IsRequired
|
|
|
|
--------------------------------------------------------------------------------
|
|
-- Constants
|
|
|
|
-- | Delay between attempts to get job results if the job is incomplete.
|
|
streamDelaySeconds :: Int
|
|
streamDelaySeconds = 1
|
|
|
|
--------------------------------------------------------------------------------
|
|
-- Executing the planned actions forest
|
|
|
|
runExecute ::
|
|
MonadIO m
|
|
=> BigQuerySourceConfig
|
|
-> Execute RecordSet
|
|
-> m (Either ExecuteProblem RecordSet)
|
|
runExecute credentials m =
|
|
liftIO
|
|
(runExceptT
|
|
(runReaderT
|
|
(unExecute (m >>= getFinalRecordSet))
|
|
(ExecuteReader {credentials})))
|
|
|
|
executeSelect :: Select -> Execute RecordSet
|
|
executeSelect select = do
|
|
credentials <- asks credentials
|
|
recordSet <-
|
|
streamBigQuery credentials (selectToBigQuery select) >>= liftEither
|
|
pure recordSet {wantedFields = selectFinalWantedFields select}
|
|
|
|
-- | This is needed to strip out unneeded fields (join keys) in the
|
|
-- final query. This is a relic of the data loader approach. A later
|
|
-- improvement would be to update the FromIr code to explicitly
|
|
-- reselect the query. But the purpose of this commit is to drop the
|
|
-- dataloader code and not modify the from IR code which is more
|
|
-- delicate.
|
|
getFinalRecordSet :: RecordSet -> Execute RecordSet
|
|
getFinalRecordSet recordSet =
|
|
pure
|
|
recordSet
|
|
{ rows =
|
|
fmap
|
|
(\row ->
|
|
OMap.filterWithKey
|
|
(\(FieldNameText k) _ ->
|
|
maybe True (elem k) (wantedFields recordSet))
|
|
row)
|
|
(rows recordSet)
|
|
}
|
|
|
|
--------------------------------------------------------------------------------
|
|
-- Make a big query from a select
|
|
|
|
selectToBigQuery :: Select -> BigQuery
|
|
selectToBigQuery select =
|
|
BigQuery
|
|
{ query = LT.toLazyText query
|
|
, parameters =
|
|
OMap.fromList
|
|
(map
|
|
(\(int, value) ->
|
|
( ParameterName (LT.toLazyText (ToQuery.paramName int))
|
|
, Parameter {typ = valueType value, value}))
|
|
(OMap.toList params))
|
|
, cardinality = selectCardinality select
|
|
}
|
|
where
|
|
(query, params) =
|
|
ToQuery.renderBuilderPretty (ToQuery.fromSelect select)
|
|
|
|
--------------------------------------------------------------------------------
|
|
-- Type system
|
|
|
|
-- | Make a BigQuery type for the given value.
|
|
valueType :: Value -> BigQueryType
|
|
valueType =
|
|
\case
|
|
DecimalValue {} -> DECIMAL
|
|
BigDecimalValue {} -> BIGDECIMAL
|
|
IntegerValue {} -> INTEGER
|
|
FloatValue {} -> FLOAT
|
|
GeographyValue {} -> GEOGRAPHY
|
|
StringValue {} -> STRING
|
|
BytesValue {} -> BYTES
|
|
BoolValue {} -> BOOL
|
|
DatetimeValue {} -> DATETIME
|
|
TimeValue {} -> TIME
|
|
DateValue {} -> DATE
|
|
TimestampValue {} -> TIMESTAMP
|
|
ArrayValue values ->
|
|
ARRAY
|
|
(case values V.!? 0 of
|
|
Just v -> valueType v
|
|
-- Above: We base the type from the first element. Later,
|
|
-- we could add some kind of sanity check that they are all
|
|
-- the same type.
|
|
Nothing -> STRING
|
|
-- Above: If the array is null, it doesn't matter what type
|
|
-- the element is. So we put STRING.
|
|
)
|
|
NullValue -> STRING
|
|
-- Above: If the value is null, it doesn't matter what type
|
|
-- the element is. So we put STRING.
|
|
|
|
--------------------------------------------------------------------------------
|
|
-- JSON serialization
|
|
|
|
-- | Make a JSON representation of the type of the given value.
|
|
valueToBigQueryJson :: Value -> Aeson.Value
|
|
valueToBigQueryJson = go
|
|
where
|
|
go =
|
|
\case
|
|
NullValue -> Aeson.Null -- TODO: I haven't tested whether BigQuery is happy with this null value.
|
|
DecimalValue i -> Aeson.object ["value" .= i]
|
|
BigDecimalValue i -> Aeson.object ["value" .= i]
|
|
IntegerValue i -> Aeson.object ["value" .= i]
|
|
FloatValue i -> Aeson.object ["value" .= i]
|
|
TimestampValue i -> Aeson.object ["value" .= i]
|
|
DateValue (Date i) -> Aeson.object ["value" .= i]
|
|
TimeValue (Time i) -> Aeson.object ["value" .= i]
|
|
DatetimeValue (Datetime i) -> Aeson.object ["value" .= i]
|
|
GeographyValue (Geography i) -> Aeson.object ["value" .= i]
|
|
StringValue i -> Aeson.object ["value" .= Aeson.String i]
|
|
BytesValue i -> Aeson.object ["value" .= i]
|
|
BoolValue i ->
|
|
Aeson.object
|
|
[ "value" .=
|
|
Aeson.String
|
|
(if i
|
|
then "true"
|
|
else "false")
|
|
]
|
|
ArrayValue vs ->
|
|
Aeson.object ["array_values" .= Aeson.Array (fmap go vs)]
|
|
|
|
--------------------------------------------------------------------------------
|
|
-- Execute a query as a job and stream the results into a record set
|
|
|
|
-- | TODO: WARNING: This function hasn't been tested on Big Data(tm),
|
|
-- and therefore I was unable to get BigQuery to produce paginated
|
|
-- results that would contain the 'pageToken' field in the JSON
|
|
-- response. Until that test has been done, we should consider this a
|
|
-- preliminary implementation.
|
|
streamBigQuery ::
|
|
MonadIO m => BigQuerySourceConfig -> BigQuery -> m (Either ExecuteProblem RecordSet)
|
|
streamBigQuery credentials bigquery = do
|
|
jobResult <- createQueryJob credentials bigquery
|
|
case jobResult of
|
|
Right job -> do records <- loop Nothing Nothing
|
|
-- liftIO (print records)
|
|
pure records
|
|
where loop pageToken mrecordSet = do
|
|
results <- getJobResults credentials job Fetch {pageToken}
|
|
case results of
|
|
Left problem -> pure (Left problem)
|
|
Right (JobComplete JobResults { pageToken = mpageToken'
|
|
, recordSet = recordSet'@RecordSet {rows = rows'}
|
|
}) -> do
|
|
let extendedRecordSet =
|
|
case mrecordSet of
|
|
Nothing -> recordSet'
|
|
Just recordSet@RecordSet {rows} ->
|
|
(recordSet {rows = rows <> rows'})
|
|
case mpageToken' of
|
|
Nothing -> pure (Right extendedRecordSet)
|
|
Just pageToken' ->
|
|
loop (pure pageToken') (pure extendedRecordSet)
|
|
Right JobIncomplete {} -> do
|
|
liftIO (threadDelay (1000 * 1000 * streamDelaySeconds))
|
|
loop pageToken mrecordSet
|
|
Left e -> pure (Left e)
|
|
|
|
--------------------------------------------------------------------------------
|
|
-- Querying results from a job
|
|
|
|
data JobResults = JobResults
|
|
{ pageToken :: Maybe Text
|
|
, recordSet :: RecordSet
|
|
} deriving (Show)
|
|
|
|
instance Aeson.FromJSON JobResults where
|
|
parseJSON =
|
|
Aeson.withObject
|
|
"JobResults"
|
|
(\o -> do
|
|
recordSet <- parseRecordSetPayload o
|
|
pageToken <-
|
|
fmap
|
|
(\mtoken -> do
|
|
token <- mtoken
|
|
guard (not (T.null token))
|
|
pure token)
|
|
(o .:? "pageToken")
|
|
pure JobResults {..})
|
|
|
|
data JobResultsResponse
|
|
= JobIncomplete
|
|
| JobComplete JobResults
|
|
deriving (Show)
|
|
|
|
instance Aeson.FromJSON JobResultsResponse where
|
|
parseJSON j =
|
|
Aeson.withObject
|
|
"JobResultsResponse"
|
|
(\o -> do
|
|
kind <- o .: "kind"
|
|
if kind == ("bigquery#getQueryResultsResponse" :: Text)
|
|
then do
|
|
complete <- o .: "jobComplete"
|
|
if complete
|
|
then fmap JobComplete (Aeson.parseJSON j)
|
|
else pure JobIncomplete
|
|
else fail ("Invalid kind: " <> show kind))
|
|
j
|
|
|
|
data Fetch = Fetch
|
|
{ pageToken :: Maybe Text
|
|
} deriving (Show)
|
|
|
|
-- | Get results of a job.
|
|
getJobResults ::
|
|
MonadIO m
|
|
=> BigQuerySourceConfig
|
|
-> Job
|
|
-> Fetch
|
|
-> m (Either ExecuteProblem JobResultsResponse)
|
|
getJobResults sc@BigQuerySourceConfig {..} Job {jobId, location} Fetch {pageToken} =
|
|
liftIO (catchAny run (pure . Left . GetJobResultsProblem))
|
|
where
|
|
-- https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get#query-parameters
|
|
url =
|
|
"GET https://bigquery.googleapis.com/bigquery/v2/projects/" <>
|
|
T.unpack _scProjectId <>
|
|
"/queries/" <>
|
|
T.unpack jobId <>
|
|
"?alt=json" <>
|
|
"&location=" <> T.unpack location <>
|
|
"&" <>
|
|
T.unpack (encodeParams extraParameters)
|
|
run = do
|
|
let req = setRequestHeader "Content-Type" ["application/json"]
|
|
$ parseRequest_ url
|
|
eResp <- runBigQuery sc req
|
|
case eResp of
|
|
Left e -> pure (Left (ExecuteRunBigQueryProblem e))
|
|
Right resp ->
|
|
case getResponseStatusCode resp of
|
|
200 -> case Aeson.eitherDecode (getResponseBody resp) of
|
|
Left e -> pure (Left (GetJobDecodeProblem e))
|
|
Right results -> pure (Right results)
|
|
_ -> do
|
|
pure $ Left $ RESTRequestNonOK (getResponseStatus resp) $ lbsToTxt $ getResponseBody resp
|
|
extraParameters = pageTokenParam
|
|
where
|
|
pageTokenParam =
|
|
case pageToken of
|
|
Nothing -> []
|
|
Just token -> [("pageToken", token)]
|
|
encodeParams = T.intercalate "&" . map (\(k, v) -> k <> "=" <> v)
|
|
|
|
--------------------------------------------------------------------------------
|
|
-- Creating jobs
|
|
|
|
data Job = Job
|
|
{ state :: !Text
|
|
, jobId :: !Text
|
|
, location :: !Text
|
|
} deriving (Show)
|
|
|
|
instance Aeson.FromJSON Job where
|
|
parseJSON =
|
|
Aeson.withObject
|
|
"Job"
|
|
(\o -> do
|
|
kind <- o .: "kind"
|
|
if kind == ("bigquery#job" :: Text)
|
|
then do
|
|
state <- do
|
|
status <- o .: "status"
|
|
status .: "state"
|
|
(jobId, location) <- do
|
|
ref <- o .: "jobReference"
|
|
-- 'location' is needed in addition to 'jobId' to query a job's
|
|
-- status
|
|
(,) <$> ref .: "jobId" <*> ref .: "location"
|
|
pure Job {state, jobId, location}
|
|
else fail ("Invalid kind: " <> show kind))
|
|
|
|
-- | Create a job asynchronously.
|
|
createQueryJob :: MonadIO m => BigQuerySourceConfig -> BigQuery -> m (Either ExecuteProblem Job)
|
|
createQueryJob sc@BigQuerySourceConfig {..} BigQuery {..} =
|
|
liftIO (do -- putStrLn (LT.unpack query)
|
|
catchAny run (pure . Left . CreateQueryJobProblem))
|
|
where
|
|
run = do
|
|
let url = "POST https://content-bigquery.googleapis.com/bigquery/v2/projects/" <>
|
|
T.unpack _scProjectId <>
|
|
"/jobs?alt=json"
|
|
let req = setRequestHeader "Content-Type" ["application/json"]
|
|
$ setRequestBodyLBS body
|
|
$ parseRequest_ url
|
|
eResp <- runBigQuery sc req
|
|
case eResp of
|
|
Left e -> pure (Left (ExecuteRunBigQueryProblem e))
|
|
Right resp ->
|
|
case getResponseStatusCode resp of
|
|
200 ->
|
|
case Aeson.eitherDecode (getResponseBody resp) of
|
|
Left e -> pure (Left (CreateQueryJobDecodeProblem e))
|
|
Right job -> pure (Right job)
|
|
_ -> do
|
|
|
|
pure $ Left $ RESTRequestNonOK (getResponseStatus resp) $ lbsToTxt $ getResponseBody resp
|
|
body =
|
|
Aeson.encode
|
|
(Aeson.object
|
|
[ "configuration" .=
|
|
Aeson.object
|
|
[ "jobType" .= "QUERY"
|
|
, "query" .=
|
|
Aeson.object
|
|
[ "query" .= query
|
|
, "useLegacySql" .= False -- Important, it makes `quotes` work properly.
|
|
, "parameterMode" .= "NAMED"
|
|
, "queryParameters" .=
|
|
map
|
|
(\(name, Parameter {..}) ->
|
|
Aeson.object
|
|
[ "name" .= Aeson.toJSON name
|
|
, "parameterType" .= Aeson.toJSON typ
|
|
, "parameterValue" .= valueToBigQueryJson value
|
|
])
|
|
(OMap.toList parameters)
|
|
]
|
|
]
|
|
])
|
|
|
|
--------------------------------------------------------------------------------
|
|
-- Consuming recordset from big query
|
|
|
|
parseRecordSetPayload :: Aeson.Object -> Aeson.Parser RecordSet
|
|
parseRecordSetPayload resp = do
|
|
schema <- resp .: "schema"
|
|
columns <- schema .: "fields" :: Aeson.Parser (Vector BigQueryField)
|
|
rowsJSON <- fmap (fromMaybe mempty) (resp .:? "rows" :: Aeson.Parser (Maybe (Vector Aeson.Value)))
|
|
rows <-
|
|
V.imapM
|
|
(\i row -> parseRow columns row Aeson.<?> Aeson.Index i)
|
|
rowsJSON Aeson.<?> Aeson.Key "rows"
|
|
pure RecordSet {wantedFields = Nothing, rows}
|
|
|
|
--------------------------------------------------------------------------------
|
|
-- Schema-driven JSON deserialization
|
|
|
|
parseRow :: Vector BigQueryField -> Aeson.Value -> Aeson.Parser (InsOrdHashMap FieldNameText OutputValue)
|
|
parseRow columnTypes value = do
|
|
result <- parseBigQueryRow columnTypes value
|
|
case result of
|
|
RecordOutputValue row -> pure row
|
|
_ -> fail ("Expected a record when parsing a top-level row: " ++ show value)
|
|
|
|
-- | Parse a row, which at the top-level of the "rows" output has no
|
|
-- {"v":..} wrapper. But when appearing nestedly, does have the
|
|
-- wrapper. See 'parseBigQueryValue'.
|
|
parseBigQueryRow :: Vector BigQueryField -> Aeson.Value -> Aeson.Parser OutputValue
|
|
parseBigQueryRow columnTypes =
|
|
Aeson.withObject
|
|
"RECORD"
|
|
(\o -> do
|
|
fields <- o .: "f" Aeson.<?> Aeson.Key "RECORD"
|
|
values <-
|
|
sequence
|
|
(V.izipWith
|
|
(\i typ field ->
|
|
parseBigQueryField typ field Aeson.<?> Aeson.Index i)
|
|
columnTypes
|
|
fields) Aeson.<?>
|
|
Aeson.Key "f"
|
|
pure (RecordOutputValue (OMap.fromList (V.toList values))))
|
|
|
|
parseBigQueryValue :: IsNullable -> BigQueryFieldType -> Aeson.Value -> Aeson.Parser OutputValue
|
|
parseBigQueryValue isNullable fieldType object =
|
|
case fieldType of
|
|
FieldSTRUCT types ->
|
|
has_v isNullable (parseBigQueryRow types) object Aeson.<?> Aeson.Key "RECORD"
|
|
FieldDECIMAL ->
|
|
has_v isNullable (fmap DecimalOutputValue . Aeson.parseJSON) object Aeson.<?>
|
|
Aeson.Key "DECIMAL"
|
|
FieldBIGDECIMAL ->
|
|
has_v isNullable (fmap BigDecimalOutputValue . Aeson.parseJSON) object Aeson.<?>
|
|
Aeson.Key "BIGDECIMAL"
|
|
FieldINTEGER ->
|
|
has_v isNullable (fmap IntegerOutputValue . Aeson.parseJSON) object Aeson.<?>
|
|
Aeson.Key "INTEGER"
|
|
FieldDATE ->
|
|
has_v isNullable (fmap DateOutputValue . Aeson.parseJSON) object Aeson.<?>
|
|
Aeson.Key "DATE"
|
|
FieldTIME ->
|
|
has_v isNullable (fmap TimeOutputValue . Aeson.parseJSON) object Aeson.<?>
|
|
Aeson.Key "TIME"
|
|
FieldDATETIME ->
|
|
has_v isNullable (fmap DatetimeOutputValue . Aeson.parseJSON) object Aeson.<?>
|
|
Aeson.Key "DATETIME"
|
|
FieldTIMESTAMP ->
|
|
has_v isNullable (fmap (TimestampOutputValue . Timestamp . utctimeToISO8601Text) . Aeson.withText "FieldTIMESTAMP" textToUTCTime) object Aeson.<?>
|
|
Aeson.Key "TIMESTAMP"
|
|
FieldGEOGRAPHY ->
|
|
has_v isNullable (fmap GeographyOutputValue . Aeson.parseJSON) object Aeson.<?>
|
|
Aeson.Key "GEOGRAPHY"
|
|
FieldFLOAT ->
|
|
has_v isNullable (fmap FloatOutputValue . Aeson.parseJSON) object Aeson.<?>
|
|
Aeson.Key "FLOAT"
|
|
FieldBOOL ->
|
|
has_v isNullable (fmap (BoolOutputValue . (== "true")) . Aeson.parseJSON) object Aeson.<?>
|
|
Aeson.Key "BOOL"
|
|
FieldSTRING ->
|
|
has_v isNullable (fmap TextOutputValue . Aeson.parseJSON) object Aeson.<?>
|
|
Aeson.Key "STRING"
|
|
FieldBYTES ->
|
|
has_v isNullable (fmap BytesOutputValue . Aeson.parseJSON) object Aeson.<?>
|
|
Aeson.Key "BYTES"
|
|
|
|
parseBigQueryField :: BigQueryField -> Aeson.Value -> Aeson.Parser (FieldNameText, OutputValue)
|
|
parseBigQueryField BigQueryField {name, typ, mode} value1 =
|
|
case mode of
|
|
Repeated ->
|
|
(do values <- has_v_generic Aeson.parseJSON value1
|
|
outputs <-
|
|
V.imapM
|
|
(\i value2 ->
|
|
parseBigQueryValue IsRequired typ value2 Aeson.<?>
|
|
Aeson.Index i)
|
|
values
|
|
pure (name, ArrayOutputValue outputs)) Aeson.<?>
|
|
Aeson.Key "REPEATED"
|
|
Nullable -> do
|
|
output <-
|
|
parseBigQueryValue IsNullable typ value1 Aeson.<?> Aeson.Key "NULLABLE"
|
|
pure (name, output)
|
|
NotNullable -> do
|
|
output <-
|
|
parseBigQueryValue IsRequired typ value1 Aeson.<?> Aeson.Key "REQUIRED"
|
|
pure (name, output)
|
|
|
|
-- Every value, after the top-level row, is wrapped in this.
|
|
has_v ::
|
|
IsNullable
|
|
-> (Aeson.Value -> Aeson.Parser OutputValue)
|
|
-> Aeson.Value
|
|
-> Aeson.Parser OutputValue
|
|
has_v isNullable f =
|
|
Aeson.withObject
|
|
"HAS_V"
|
|
(\o ->
|
|
o .: "v" >>= \v ->
|
|
case v of
|
|
Aeson.Null
|
|
| IsNullable <- isNullable -> pure NullOutputValue
|
|
_ -> f v Aeson.<?> Aeson.Key "v")
|
|
|
|
-- Every value, after the top-level row, is wrapped in this.
|
|
has_v_generic ::
|
|
(Aeson.Value -> Aeson.Parser a)
|
|
-> Aeson.Value
|
|
-> Aeson.Parser a
|
|
has_v_generic f =
|
|
Aeson.withObject
|
|
"HAS_V"
|
|
(\o -> o .: "v" >>= \v -> (f v Aeson.<?> Aeson.Key "v"))
|
|
|
|
--------------------------------------------------------------------------------
|
|
-- Generic JSON deserialization
|
|
|
|
instance Aeson.ToJSON BigQueryType where
|
|
toJSON =
|
|
\case
|
|
ARRAY t -> Aeson.object ["type" .= ("ARRAY" :: Text), "arrayType" .= t]
|
|
DECIMAL -> atomic "NUMERIC"
|
|
BIGDECIMAL -> atomic "BIGNUMERIC"
|
|
INTEGER -> atomic "INTEGER"
|
|
DATE -> atomic "DATE"
|
|
TIME -> atomic "TIME"
|
|
DATETIME -> atomic "DATETIME"
|
|
TIMESTAMP -> atomic "TIMESTAMP"
|
|
FLOAT -> atomic "FLOAT"
|
|
GEOGRAPHY -> atomic "GEOGRAPHY"
|
|
STRING -> atomic "STRING"
|
|
BYTES -> atomic "BYTES"
|
|
BOOL -> atomic "BOOL"
|
|
where
|
|
atomic ty = Aeson.object ["type" .= (ty :: Text)]
|
|
|
|
instance Aeson.FromJSON BigQueryField where
|
|
parseJSON =
|
|
Aeson.withObject
|
|
"BigQueryField"
|
|
(\o -> do
|
|
name <- o .: "name"
|
|
typ <-
|
|
do flag :: Text <- o .: "type"
|
|
if | flag == "NUMERIC" || flag == "DECIMAL" -> pure FieldDECIMAL
|
|
| flag == "BIGNUMERIC" || flag == "BIGDECIMAL" ->
|
|
pure FieldBIGDECIMAL
|
|
| flag == "INT64" || flag == "INTEGER" -> pure FieldINTEGER
|
|
| flag == "FLOAT64" || flag == "FLOAT" -> pure FieldFLOAT
|
|
| flag == "BOOLEAN" || flag == "BOOL" -> pure FieldBOOL
|
|
| flag == "STRING" -> pure FieldSTRING
|
|
| flag == "DATE" -> pure FieldDATE
|
|
| flag == "TIME" -> pure FieldTIME
|
|
| flag == "DATETIME" -> pure FieldDATETIME
|
|
| flag == "TIMESTAMP" -> pure FieldTIMESTAMP
|
|
| flag == "GEOGRAPHY" -> pure FieldGEOGRAPHY
|
|
| flag == "BYTES" -> pure FieldBYTES
|
|
| flag == "RECORD" || flag == "STRUCT" ->
|
|
do fields <- o .: "fields"
|
|
pure (FieldSTRUCT fields)
|
|
| otherwise -> fail ("Unsupported field type: " ++ show flag)
|
|
mode <- o .: "mode"
|
|
pure BigQueryField {..})
|
|
|
|
instance Aeson.FromJSON Mode where
|
|
parseJSON j = do
|
|
s <- Aeson.parseJSON j
|
|
case s :: Text of
|
|
"NULLABLE" -> pure Nullable
|
|
"REPEATED" -> pure Repeated
|
|
_ -> pure NotNullable
|