graphql-engine/server/tests-py/utils.py
Karthikeyan Chinnakonda 2325755954 server: streaming subscriptions schema generation and tests (incremental PR - 3)
PR-URL: https://github.com/hasura/graphql-engine-mono/pull/4259
Co-authored-by: Rikin Kachhia <54616969+rikinsk@users.noreply.github.com>
Co-authored-by: Brandon Simmons <210815+jberryman@users.noreply.github.com>
Co-authored-by: paritosh-08 <85472423+paritosh-08@users.noreply.github.com>
GitOrigin-RevId: 4d1b4ec3c01f3a839f4392d3b77950fc3ab30236
2022-04-22 19:54:11 +00:00

60 lines
1.5 KiB
Python

# Various testing utility functions
import time
# Loop a function 'tries' times, until all assertions pass. With a 0.3 second
# pause after each. This re-raises AssertionError in case we run out of tries
def until_asserts_pass(tries, func):
for x in range(0, tries):
print(x)
if x == tries-1:
# last time; raise any assertions in caller:
func()
else:
try:
func()
break
except AssertionError:
time.sleep(0.3)
pass
def insert(hge_ctx, table, row, returning=[], headers = {}):
return insert_many(hge_ctx, table, [row], returning, headers)
def insert_many(hge_ctx, table, rows, returning=[], headers = {}):
q = {
"type": "insert",
"args": {
"table": table,
"objects": rows,
"returning": returning
}
}
st_code, resp = hge_ctx.v1q(q, headers = headers)
return st_code, resp
def update(hge_ctx, table, where_exp, set_exp, headers = {}):
q = {
"type": "update",
"args": {
"table": table,
"where": where_exp,
"$set": set_exp
}
}
st_code, resp = hge_ctx.v1q(q, headers = headers)
return st_code, resp
def delete(hge_ctx, table, where_exp, headers = {}):
q = {
"type": "delete",
"args": {
"table": table,
"where": where_exp
}
}
st_code, resp = hge_ctx.v1q(q, headers = headers)
return st_code, resp