mirror of
https://github.com/hasura/graphql-engine.git
synced 2025-01-05 14:27:59 +03:00
2325755954
PR-URL: https://github.com/hasura/graphql-engine-mono/pull/4259 Co-authored-by: Rikin Kachhia <54616969+rikinsk@users.noreply.github.com> Co-authored-by: Brandon Simmons <210815+jberryman@users.noreply.github.com> Co-authored-by: paritosh-08 <85472423+paritosh-08@users.noreply.github.com> GitOrigin-RevId: 4d1b4ec3c01f3a839f4392d3b77950fc3ab30236
60 lines
1.5 KiB
Python
60 lines
1.5 KiB
Python
# Various testing utility functions
|
|
|
|
import time
|
|
|
|
# Loop a function 'tries' times, until all assertions pass. With a 0.3 second
|
|
# pause after each. This re-raises AssertionError in case we run out of tries
|
|
def until_asserts_pass(tries, func):
|
|
for x in range(0, tries):
|
|
print(x)
|
|
if x == tries-1:
|
|
# last time; raise any assertions in caller:
|
|
func()
|
|
else:
|
|
try:
|
|
func()
|
|
break
|
|
except AssertionError:
|
|
time.sleep(0.3)
|
|
pass
|
|
|
|
def insert(hge_ctx, table, row, returning=[], headers = {}):
|
|
return insert_many(hge_ctx, table, [row], returning, headers)
|
|
|
|
def insert_many(hge_ctx, table, rows, returning=[], headers = {}):
|
|
q = {
|
|
"type": "insert",
|
|
"args": {
|
|
"table": table,
|
|
"objects": rows,
|
|
"returning": returning
|
|
}
|
|
}
|
|
st_code, resp = hge_ctx.v1q(q, headers = headers)
|
|
return st_code, resp
|
|
|
|
|
|
def update(hge_ctx, table, where_exp, set_exp, headers = {}):
|
|
q = {
|
|
"type": "update",
|
|
"args": {
|
|
"table": table,
|
|
"where": where_exp,
|
|
"$set": set_exp
|
|
}
|
|
}
|
|
st_code, resp = hge_ctx.v1q(q, headers = headers)
|
|
return st_code, resp
|
|
|
|
|
|
def delete(hge_ctx, table, where_exp, headers = {}):
|
|
q = {
|
|
"type": "delete",
|
|
"args": {
|
|
"table": table,
|
|
"where": where_exp
|
|
}
|
|
}
|
|
st_code, resp = hge_ctx.v1q(q, headers = headers)
|
|
return st_code, resp
|