2019-04-17 19:29:39 +03:00
|
|
|
import pytest
|
|
|
|
import json
|
2019-07-22 15:47:13 +03:00
|
|
|
import jsondiff
|
2022-01-17 10:39:59 +03:00
|
|
|
from ruamel.yaml import YAML
|
|
|
|
|
|
|
|
yaml=YAML(typ='safe', pure=True)
|
2019-04-17 19:29:39 +03:00
|
|
|
|
|
|
|
class TestInconsistentObjects():
|
|
|
|
|
|
|
|
get_inconsistent_metadata = {
|
|
|
|
"type": "get_inconsistent_metadata",
|
|
|
|
"args": {}
|
|
|
|
}
|
|
|
|
reload_metadata = {
|
|
|
|
"type": "reload_metadata",
|
|
|
|
"args": {}
|
|
|
|
}
|
|
|
|
drop_inconsistent_metadata = {
|
|
|
|
"type": "drop_inconsistent_metadata",
|
|
|
|
"args": {}
|
|
|
|
}
|
|
|
|
export_metadata = {
|
|
|
|
"type": "export_metadata",
|
|
|
|
"args": {}
|
|
|
|
}
|
|
|
|
|
|
|
|
def test_inconsistent_objects(self, hge_ctx):
|
|
|
|
with open(self.dir() + "/test.yaml") as c:
|
2022-01-17 10:39:59 +03:00
|
|
|
test = yaml.load(c)
|
2019-04-17 19:29:39 +03:00
|
|
|
|
|
|
|
# setup
|
|
|
|
st_code, resp = hge_ctx.v1q(json.loads(json.dumps(test['setup'])))
|
|
|
|
assert st_code == 200, resp
|
|
|
|
|
2021-05-21 05:46:58 +03:00
|
|
|
try:
|
|
|
|
# exec sql to cause inconsistentancy
|
|
|
|
sql_res = hge_ctx.sql(test['sql'])
|
|
|
|
|
|
|
|
# reload metadata
|
|
|
|
st_code, resp = hge_ctx.v1q(q=self.reload_metadata)
|
|
|
|
assert st_code == 200, resp
|
|
|
|
|
|
|
|
# fetch inconsistent objects
|
|
|
|
st_code, resp = hge_ctx.v1q(q=self.get_inconsistent_metadata)
|
|
|
|
assert st_code == 200, resp
|
|
|
|
incons_objs_test = test['inconsistent_objects']
|
|
|
|
incons_objs_resp = resp['inconsistent_objects']
|
|
|
|
|
|
|
|
assert resp['is_consistent'] == False, resp
|
|
|
|
assert incons_objs_resp == incons_objs_test, yaml.dump({
|
|
|
|
'response': incons_objs_resp,
|
|
|
|
'expected': incons_objs_test,
|
|
|
|
'diff': jsondiff.diff(incons_objs_test, incons_objs_resp)
|
|
|
|
})
|
|
|
|
|
|
|
|
# export metadata
|
|
|
|
st_code, export = hge_ctx.v1q(q=self.export_metadata)
|
|
|
|
assert st_code == 200, export
|
|
|
|
|
|
|
|
# apply metadata
|
|
|
|
st_code, resp = hge_ctx.v1q(
|
|
|
|
q={
|
|
|
|
"type": "replace_metadata",
|
|
|
|
"args": export
|
|
|
|
}
|
|
|
|
)
|
|
|
|
assert st_code == 400, resp
|
|
|
|
|
|
|
|
finally:
|
|
|
|
# drop inconsistent objects
|
|
|
|
st_code, resp = hge_ctx.v1q(q=self.drop_inconsistent_metadata)
|
|
|
|
assert st_code == 200, resp
|
|
|
|
|
|
|
|
# reload metadata
|
|
|
|
st_code, resp = hge_ctx.v1q(q=self.reload_metadata)
|
|
|
|
assert st_code == 200, resp
|
|
|
|
|
|
|
|
# fetch inconsistent objects
|
|
|
|
st_code, resp = hge_ctx.v1q(q=self.get_inconsistent_metadata)
|
|
|
|
assert st_code == 200, resp
|
|
|
|
|
|
|
|
assert resp['is_consistent'] == True, resp
|
|
|
|
assert len(resp['inconsistent_objects']) == 0, resp
|
|
|
|
|
|
|
|
# teardown
|
|
|
|
st_code, resp = hge_ctx.v1q(json.loads(json.dumps(test['teardown'])))
|
|
|
|
assert st_code == 200, resp
|
2019-04-17 19:29:39 +03:00
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def dir(cls):
|
|
|
|
return 'queries/inconsistent_objects'
|