mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-14 08:02:15 +03:00
https://github.com/hasura/graphql-engine/pull/5759
This commit is contained in:
parent
7302abeed0
commit
10f41e7559
@ -57,6 +57,7 @@ This release contains the [PDV refactor (#4111)](https://github.com/hasura/graph
|
||||
- server: add action-like URL templating for event triggers and remote schemas (fixes #2483)
|
||||
- server: change `created_at` column type from `timestamp` to `timestamptz` for scheduled triggers tables (fix #5722)
|
||||
- server: allow configuring timeouts for actions (fixes #4966)
|
||||
- server: accept only non-negative integers for batch size and refetch interval (close #5653) (#5759)
|
||||
- console: allow user to cascade Postgres dependencies when dropping Postgres objects (close #5109) (#5248)
|
||||
- console: mark inconsistent remote schemas in the UI (close #5093) (#5181)
|
||||
- cli: add missing global flags for seeds command (#5565)
|
||||
|
@ -524,6 +524,7 @@ test-suite graphql-engine-tests
|
||||
Data.Parser.URLTemplate
|
||||
Data.Parser.JSONPathSpec
|
||||
Data.TimeSpec
|
||||
Data.NonNegativeIntSpec
|
||||
Hasura.IncrementalSpec
|
||||
Hasura.RQL.MetadataSpec
|
||||
Hasura.Server.MigrateSpec
|
||||
|
@ -101,9 +101,13 @@ module Hasura.GraphQL.Execute.LiveQuery
|
||||
, dumpLiveQueriesState
|
||||
|
||||
, LiveQueriesOptions(..)
|
||||
, BatchSize(..)
|
||||
, RefetchInterval(..)
|
||||
, BatchSize
|
||||
, unBatchSize
|
||||
, RefetchInterval
|
||||
, unRefetchInterval
|
||||
, mkLiveQueriesOptions
|
||||
, mkBatchSize
|
||||
, mkRefetchInterval
|
||||
|
||||
, LiveQueryId
|
||||
, addLiveQuery
|
||||
|
@ -1,13 +1,18 @@
|
||||
module Hasura.GraphQL.Execute.LiveQuery.Options
|
||||
( LiveQueriesOptions(..)
|
||||
, BatchSize(..)
|
||||
, RefetchInterval(..)
|
||||
, BatchSize
|
||||
, unBatchSize
|
||||
, RefetchInterval
|
||||
, unRefetchInterval
|
||||
, mkLiveQueriesOptions
|
||||
, mkBatchSize
|
||||
, mkRefetchInterval
|
||||
) where
|
||||
|
||||
import Hasura.Prelude
|
||||
|
||||
import qualified Data.Aeson as J
|
||||
import qualified Data.Aeson as J
|
||||
import Hasura.RQL.Types.Common (NonNegativeDiffTime, NonNegativeInt, mkNonNegativeInt, mkNonNegativeDiffTime)
|
||||
|
||||
data LiveQueriesOptions
|
||||
= LiveQueriesOptions
|
||||
@ -32,10 +37,16 @@ instance J.FromJSON LiveQueriesOptions where
|
||||
LiveQueriesOptions <$> o J..: "batch_size"
|
||||
<*> o J..: "refetch_delay"
|
||||
|
||||
newtype BatchSize = BatchSize { unBatchSize :: Int }
|
||||
newtype BatchSize = BatchSize { unBatchSize :: NonNegativeInt }
|
||||
deriving (Show, Eq, J.ToJSON, J.FromJSON)
|
||||
|
||||
mkBatchSize :: Int -> Maybe BatchSize
|
||||
mkBatchSize x = BatchSize <$> mkNonNegativeInt x
|
||||
|
||||
-- TODO this is treated as milliseconds in fromEnv and as seconds in ToJSON.
|
||||
-- ideally this would have e.g. ... unRefetchInterval :: Milliseconds
|
||||
newtype RefetchInterval = RefetchInterval { unRefetchInterval :: DiffTime }
|
||||
newtype RefetchInterval = RefetchInterval { unRefetchInterval :: NonNegativeDiffTime }
|
||||
deriving (Show, Eq, J.ToJSON, J.FromJSON)
|
||||
|
||||
mkRefetchInterval :: DiffTime -> Maybe RefetchInterval
|
||||
mkRefetchInterval x = RefetchInterval <$> mkNonNegativeDiffTime x
|
||||
|
@ -70,6 +70,7 @@ import Hasura.Db
|
||||
import Hasura.GraphQL.Execute.LiveQuery.Options
|
||||
import Hasura.GraphQL.Execute.LiveQuery.Plan
|
||||
import Hasura.GraphQL.Transport.HTTP.Protocol
|
||||
import Hasura.RQL.Types.Common (getNonNegativeInt)
|
||||
import Hasura.RQL.Types.Error
|
||||
import Hasura.Session
|
||||
|
||||
@ -417,7 +418,7 @@ pollQuery pollerId lqOpts pgExecCtx pgQuery cohortMap postPollHook = do
|
||||
cohorts <- STM.atomically $ TMap.toList cohortMap
|
||||
cohortSnapshots <- mapM (STM.atomically . getCohortSnapshot) cohorts
|
||||
-- cohorts are broken down into batches specified by the batch size
|
||||
pure $ chunksOf (unBatchSize batchSize) cohortSnapshots
|
||||
pure $ chunksOf (getNonNegativeInt (unBatchSize batchSize)) cohortSnapshots
|
||||
|
||||
-- concurrently process each batch
|
||||
batchesDetails <- A.forConcurrently cohortBatches $ \cohorts -> do
|
||||
|
@ -35,6 +35,7 @@ import Hasura.Db
|
||||
import Hasura.GraphQL.Execute.LiveQuery.Options
|
||||
import Hasura.GraphQL.Execute.LiveQuery.Plan
|
||||
import Hasura.GraphQL.Execute.LiveQuery.Poll
|
||||
import Hasura.RQL.Types.Common (unNonNegativeDiffTime)
|
||||
|
||||
-- | The top-level datatype that holds the state for all active live queries.
|
||||
--
|
||||
@ -110,7 +111,7 @@ addLiveQuery logger subscriberMetadata lqState plan onResultAction = do
|
||||
pollerId <- PollerId <$> UUID.nextRandom
|
||||
threadRef <- forkImmortal ("pollQuery." <> show pollerId) logger $ forever $ do
|
||||
pollQuery pollerId lqOpts pgExecCtx query (_pCohorts handler) postPollHook
|
||||
sleep $ unRefetchInterval refetchInterval
|
||||
sleep $ unNonNegativeDiffTime $ unRefetchInterval refetchInterval
|
||||
let !pState = PollerIOState threadRef pollerId
|
||||
#ifndef PROFILING
|
||||
$assertNFHere pState -- so we don't write thunks to mutable vars
|
||||
|
@ -37,11 +37,17 @@ module Hasura.RQL.Types.Common
|
||||
, isSystemDefined
|
||||
|
||||
, successMsg
|
||||
, NonNegativeDiffTime(..)
|
||||
, NonNegativeDiffTime
|
||||
, unNonNegativeDiffTime
|
||||
, unsafeNonNegativeDiffTime
|
||||
, mkNonNegativeDiffTime
|
||||
, InputWebhook(..)
|
||||
, ResolvedWebhook(..)
|
||||
, resolveWebhook
|
||||
|
||||
, NonNegativeInt
|
||||
, getNonNegativeInt
|
||||
, mkNonNegativeInt
|
||||
, unsafeNonNegativeInt
|
||||
, Timeout(..)
|
||||
, defaultActionTimeoutSecs
|
||||
) where
|
||||
@ -52,7 +58,6 @@ import Hasura.Prelude
|
||||
import Hasura.RQL.DDL.Headers ()
|
||||
import Hasura.RQL.Types.Error
|
||||
import Hasura.SQL.Types
|
||||
import Hasura.RQL.DDL.Headers ()
|
||||
|
||||
|
||||
|
||||
@ -66,9 +71,9 @@ import Data.URL.Template
|
||||
import Instances.TH.Lift ()
|
||||
import Language.Haskell.TH.Syntax (Lift, Q, TExp)
|
||||
|
||||
import qualified Data.Environment as Env
|
||||
import qualified Data.HashMap.Strict as HM
|
||||
import qualified Data.Text as T
|
||||
import qualified Data.Environment as Env
|
||||
import qualified Database.PG.Query as Q
|
||||
import qualified Language.GraphQL.Draft.Syntax as G
|
||||
import qualified Language.Haskell.TH.Syntax as TH
|
||||
@ -282,12 +287,41 @@ isSystemDefined = unSystemDefined
|
||||
successMsg :: EncJSON
|
||||
successMsg = "{\"message\":\"success\"}"
|
||||
|
||||
newtype NonNegativeInt = NonNegativeInt { getNonNegativeInt :: Int }
|
||||
deriving (Show, Eq, ToJSON, Generic, NFData, Cacheable, Num)
|
||||
|
||||
mkNonNegativeInt :: Int -> Maybe NonNegativeInt
|
||||
mkNonNegativeInt x = case x >= 0 of
|
||||
True -> Just $ NonNegativeInt x
|
||||
False -> Nothing
|
||||
|
||||
unsafeNonNegativeInt :: Int -> NonNegativeInt
|
||||
unsafeNonNegativeInt = NonNegativeInt
|
||||
|
||||
instance FromJSON NonNegativeInt where
|
||||
parseJSON = withScientific "NonNegativeInt" $ \t -> do
|
||||
case (t >= 0) of
|
||||
True -> NonNegativeInt <$> maybeInt (toBoundedInteger t)
|
||||
False -> fail "negative value not allowed"
|
||||
where
|
||||
maybeInt x = case x of
|
||||
Just v -> return v
|
||||
Nothing -> fail "integer passed is out of bounds"
|
||||
|
||||
newtype NonNegativeDiffTime = NonNegativeDiffTime { unNonNegativeDiffTime :: DiffTime }
|
||||
deriving (Show, Eq,ToJSON,Generic, NFData, Cacheable)
|
||||
deriving (Show, Eq,ToJSON,Generic, NFData, Cacheable, Num)
|
||||
|
||||
unsafeNonNegativeDiffTime :: DiffTime -> NonNegativeDiffTime
|
||||
unsafeNonNegativeDiffTime = NonNegativeDiffTime
|
||||
|
||||
mkNonNegativeDiffTime :: DiffTime -> Maybe NonNegativeDiffTime
|
||||
mkNonNegativeDiffTime x = case x >= 0 of
|
||||
True -> Just $ NonNegativeDiffTime x
|
||||
False -> Nothing
|
||||
|
||||
instance FromJSON NonNegativeDiffTime where
|
||||
parseJSON = withScientific "NonNegativeDiffTime" $ \t -> do
|
||||
case (t > 0) of
|
||||
case (t >= 0) of
|
||||
True -> return $ NonNegativeDiffTime . realToFrac $ t
|
||||
False -> fail "negative value not allowed"
|
||||
|
||||
|
@ -18,7 +18,7 @@ import Data.Time.Clock
|
||||
import Data.Time.Clock.Units
|
||||
import Data.Time.Format.ISO8601
|
||||
import Hasura.Incremental
|
||||
import Hasura.RQL.Types.Common (NonNegativeDiffTime(..))
|
||||
import Hasura.RQL.Types.Common (NonNegativeDiffTime, unsafeNonNegativeDiffTime)
|
||||
import Hasura.RQL.Types.Action (InputWebhook(..))
|
||||
import Hasura.Prelude
|
||||
import System.Cron.Types
|
||||
@ -51,11 +51,11 @@ instance FromJSON STRetryConf where
|
||||
parseJSON = withObject "STRetryConf" \o -> do
|
||||
numRetries' <- o .:? "num_retries" .!= 0
|
||||
retryInterval <-
|
||||
o .:? "retry_interval_seconds" .!= (NonNegativeDiffTime $ seconds 10)
|
||||
o .:? "retry_interval_seconds" .!= (unsafeNonNegativeDiffTime $ seconds 10)
|
||||
timeout <-
|
||||
o .:? "timeout_seconds" .!= (NonNegativeDiffTime $ seconds 60)
|
||||
o .:? "timeout_seconds" .!= (unsafeNonNegativeDiffTime $ seconds 60)
|
||||
tolerance <-
|
||||
o .:? "tolerance_seconds" .!= (NonNegativeDiffTime $ hours 6)
|
||||
o .:? "tolerance_seconds" .!= (unsafeNonNegativeDiffTime $ hours 6)
|
||||
if numRetries' < 0
|
||||
then fail "num_retries cannot be a negative value"
|
||||
else pure $ STRetryConf numRetries' retryInterval timeout tolerance
|
||||
@ -66,9 +66,9 @@ defaultSTRetryConf :: STRetryConf
|
||||
defaultSTRetryConf =
|
||||
STRetryConf
|
||||
{ strcNumRetries = 0
|
||||
, strcRetryIntervalSeconds = NonNegativeDiffTime $ seconds 10
|
||||
, strcTimeoutSeconds = NonNegativeDiffTime $ seconds 60
|
||||
, strcToleranceSeconds = NonNegativeDiffTime $ hours 6
|
||||
, strcRetryIntervalSeconds = unsafeNonNegativeDiffTime $ seconds 10
|
||||
, strcTimeoutSeconds = unsafeNonNegativeDiffTime $ seconds 60
|
||||
, strcToleranceSeconds = unsafeNonNegativeDiffTime $ hours 6
|
||||
}
|
||||
|
||||
data CronTriggerMetadata
|
||||
|
@ -257,10 +257,14 @@ instance FromEnv [API] where
|
||||
fromEnv = readAPIs
|
||||
|
||||
instance FromEnv LQ.BatchSize where
|
||||
fromEnv = fmap LQ.BatchSize . readEither
|
||||
fromEnv s = do
|
||||
val <- readEither s
|
||||
maybe (Left "batch size should be a non negative integer") Right $ LQ.mkBatchSize val
|
||||
|
||||
instance FromEnv LQ.RefetchInterval where
|
||||
fromEnv = fmap (LQ.RefetchInterval . milliseconds . fromInteger) . readEither
|
||||
fromEnv x = do
|
||||
val <- fmap (milliseconds . fromInteger) . readEither $ x
|
||||
maybe (Left "refetch interval should be a non negative integer") Right $ LQ.mkRefetchInterval val
|
||||
|
||||
instance FromEnv Milliseconds where
|
||||
fromEnv = fmap fromInteger . readEither
|
||||
|
21
server/src-test/Data/NonNegativeIntSpec.hs
Normal file
21
server/src-test/Data/NonNegativeIntSpec.hs
Normal file
@ -0,0 +1,21 @@
|
||||
module Data.NonNegativeIntSpec (spec) where
|
||||
-- | basic tests on NonNegativeIntType
|
||||
|
||||
import Prelude
|
||||
|
||||
import Hasura.RQL.Types.Common (mkNonNegativeInt)
|
||||
|
||||
import Test.Hspec (Spec, describe, it, shouldBe)
|
||||
|
||||
spec :: Spec
|
||||
spec = do
|
||||
nonNegIntSpec
|
||||
|
||||
nonNegIntSpec :: Spec
|
||||
nonNegIntSpec =
|
||||
describe "non negative integer type" $ do
|
||||
it "only validates non negative integers" $ do
|
||||
(mkNonNegativeInt 23) `shouldBe` (Just 23)
|
||||
(mkNonNegativeInt (-23)) `shouldBe` Nothing
|
||||
|
||||
-- TODO: add spec for fromJSON for NonNegativeInt type
|
@ -31,6 +31,7 @@ import qualified Data.Parser.CacheControlSpec as CacheControlParser
|
||||
import qualified Data.Parser.JSONPathSpec as JsonPath
|
||||
import qualified Data.Parser.URLTemplate as URLTemplate
|
||||
import qualified Data.TimeSpec as TimeSpec
|
||||
import qualified Data.NonNegativeIntSpec as NonNegetiveIntSpec
|
||||
import qualified Hasura.IncrementalSpec as IncrementalSpec
|
||||
-- import qualified Hasura.RQL.MetadataSpec as MetadataSpec
|
||||
import qualified Hasura.Server.MigrateSpec as MigrateSpec
|
||||
@ -66,6 +67,7 @@ unitSpecs = do
|
||||
describe "Hasura.Incremental" IncrementalSpec.spec
|
||||
-- describe "Hasura.RQL.Metadata" MetadataSpec.spec -- Commenting until optimizing the test in CI
|
||||
describe "Data.Time" TimeSpec.spec
|
||||
describe "Data.NonNegativeInt" NonNegetiveIntSpec.spec
|
||||
describe "Hasura.Server.Telemetry" TelemetrySpec.spec
|
||||
describe "Hasura.Server.Auth" AuthSpec.spec
|
||||
describe "Hasura.Cache.Bounded" CacheBoundedSpec.spec
|
||||
|
Loading…
Reference in New Issue
Block a user