graphql-engine/server/bench-wrk/hge_wrk_bench.py
nizar-m f8a7312a30
Regression benchmarks setup (#3310)
* Benchmark GraphQL queries using wrk

* fix console assets dir

* Store wrk parameters as well

* Add details about storing results in Readme

* Remove files in bench-wrk while computing server shasum

* Instead of just getting maximum throughput per query per version,
create plots using wrk2 for a given set of requests per second.
The maximum throughput is used to see what values of requests per second are feasible.

* Add id for version dropdown

* Allow specifiying env and args for GraphQL Engine

1) Arguments defined after -- will be applied as arguments to Hasura GraphQL Engine
2) Script will also pass the environmental variables to Hasura GraphQL Engine instances

Hasura GraphQL engine can be run with the given environmental variables and arguments as follows

$ export HASURA_GRAPHQL_...=....
$ python3 hge_wrk_bench.py -- --hge_arg1 val1 --hge_arg2 val2 ...

* Use matplotlib instead of plotly for figures

* Show throughput graph also.

It maybe useful in checking performance regression across versions

* Support storing results in s3

Use --upload-root-uri 's3://bucket/path' to upload results inside the
given path.When specified, the results will be uploaded to the bucket,
including latencies, latency histogram, and the test setup info.
The s3 credentials should be provided as given in AWS boto3 documentation.

* Allow specifying a name for the test scenario

* Fix open latency uri bug

* Update wrk docker image

* Keep ylim a little higher than maximum so that the throughput plot is clearly visible

* Show throughput plots for multiple queries at the same time

* 1) Adjust size of dropdowns
2) Make label for requests/sec invisible when plot type is throughput

* 1) Adding boto3 to requirements.txt
2) Removing CPU Key print line
3) Adding info about the tests that will be run with wrk2

* Docker builder fo wrk-websocket-server

* Make it optional to setup remote graphql-engine

* Listen on all interfaces and enable ping thread

* Add bench_scripts to wrk-websocket-server docker

* Use 127.0.0.1 instead of 'localhost' to address local hge

For some reason it seems wrk was hanging trying to resolve 'localhost'.
ping was able to fine from the same container, so I'm not sure what the
deal was. Probably some local misconfiguration on my machine, but maybe
this change will also help others.

* Store latency samples in subdirectory, server_shasum just once at start, additional docs

* Add a note on running the benchmarks in the simplest way

* Add a new section on how to run benchmarks on a new linux hosted instance

Co-authored-by: Nizar Malangadan <nizar-m@users.noreply.github.com>
Co-authored-by: Brandon Simmons <brandon.m.simmons@gmail.com>
Co-authored-by: Karthikeyan Chinnakonda <karthikeyan@hasura.io>
Co-authored-by: Brandon Simmons <brandon@hasura.io>
Co-authored-by: Vamshi Surabhi <0x777@users.noreply.github.com>
2020-06-19 22:40:17 +05:30

559 lines
21 KiB
Python

from sportsdb_setup import HGETestSetup, HGETestSetupArgs
from run_hge import HGE
import graphql
import multiprocessing
import json
import os
import docker
import ruamel.yaml as yaml
import cpuinfo
import subprocess
import threading
import time
import datetime
from colorama import Fore, Style
from plot import run_dash_server
import webbrowser
import pathlib
from urllib.parse import urlparse, urlunparse
import boto3
fileLoc = os.path.dirname(os.path.abspath(__file__))
def uri_path_join(uri, *paths):
p = urlparse(uri)
new_path = os.path.join(p.path, *paths)
return urlunparse(p._replace(path=new_path))
class HGEWrkBench(HGETestSetup):
wrk_docker_image = 'hasura/wrk:v0.3'
# We'll bind mount the lua script dir to this directory within the wrk container:
lua_dir = '/tmp/bench_scripts'
rps_steps = [10, 20, 50, 100, 200, 500, 1000, 2000, 5000]
def __init__(
self, pg_url, remote_pg_url, pg_docker_image, hge_url=None,
remote_hge_url=None, hge_docker_image=None,
hge_args=[], skip_stack_build=False,
graphql_queries_file='queries.graphql', connections=50,
duration=300, results_hge_url = None, results_hge_admin_secret = None
):
self.load_queries(graphql_queries_file)
super().__init__(
pg_url = pg_url,
remote_pg_url = remote_pg_url,
pg_docker_image = pg_docker_image,
hge_url = hge_url,
remote_hge_url = remote_hge_url,
hge_docker_image = hge_docker_image,
hge_args = hge_args,
skip_stack_build = skip_stack_build
)
self.connections = connections
self.duration = duration
self.results_hge_url = results_hge_url
self.results_hge_admin_secret = results_hge_admin_secret
self.extract_cpu_info()
# NOTE: we generally want to do this just once; otherwise if we happen
# to be editing the tree while this script is running the shasum will
# keep changing:
self.server_shasum = self.get_server_shasum()
def load_queries(self, graphql_queries_file):
self.graphql_queries_file = graphql_queries_file
with open(self.graphql_queries_file) as f:
queries = f.read()
self.query_names = []
self.queries = []
for oper in graphql.parse(queries).definitions:
self.query_names.append(oper.name.value)
self.queries.append(oper)
def get_wrk2_params(self):
cpu_count = multiprocessing.cpu_count()
return {
'threads': cpu_count,
'connections': self.connections,
'duration': self.duration
}
def get_current_user(self):
return '{}:{}'.format(os.geteuid(), os.getegid())
def wrk2_test(self, query, rps):
def upload_files(files):
if self.upload_root_uri:
p = urlparse(self.upload_root_uri)
if p.scheme == 's3':
bucket = p.netloc
key = p.path.lstrip('/')
s3_client = boto3.client('s3')
for (f, f_key) in files:
s3_client.upload_file(f, bucket, os.path.join(key, f_key))
query_str = graphql.print_ast(query)
params = self.get_wrk2_params()
print(Fore.GREEN + "Running benchmark wrk2 for at {} req/s (duration: {}) for query\n".format(rps, params['duration']), query_str + Style.RESET_ALL)
bench_script = os.path.join(self.lua_dir, 'bench-wrk2.lua')
graphql_url = self.hge.url + '/v1/graphql'
timestamp = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
results_dir = self.results_root_dir
tests_path = [str(rps), timestamp]
results_dir = os.path.join(results_dir, *tests_path)
os.makedirs(results_dir, exist_ok=True)
wrk2_command = [
'wrk2',
'-R', str(rps),
'-t', str(params['threads']),
'-c', str(params['connections']),
'-d', str(params['duration']),
'--latency',
'-s', bench_script,
graphql_url,
query_str,
results_dir
]
volumes = self.get_scripts_vol()
volumes[results_dir] = {
'bind': results_dir,
'mode': 'rw'
}
self.docker_client = docker.from_env()
result = self.docker_client.containers.run(
self.wrk_docker_image,
detach = False,
stdout = True,
stderr = False,
command = wrk2_command,
network_mode = 'host',
environment = self.get_lua_env(),
volumes = volumes,
remove = True,
user = self.get_current_user()
).decode('ascii')
histogram_file = os.path.join(results_dir, 'latencies.hgrm')
histogram = self.get_latency_histogram(result, histogram_file)
summary_file = os.path.join(results_dir, 'summary.json')
with open(summary_file) as f:
summary = json.load(f)
latencies_file = os.path.join(results_dir, 'latencies')
def extract_data(v):
return v['data'] if isinstance(v, dict) and 'data' in v else v
tests_info = { k:extract_data(v) for (k, v) in self.gen_test_info(query, rps).items() }
tests_setup_file = os.path.join(results_dir, 'test_setup.json')
with open(tests_setup_file, 'w') as f:
f.write(json.dumps(tests_info, indent=2))
upload_files([
(x, os.path.join(*tests_path,y))
for (x,y) in [
(summary_file, 'summary.json'),
(latencies_file, 'latencies'),
(histogram_file, 'latencies.hgrm'),
(tests_setup_file, 'test_setup.json')
]
])
if self.upload_root_uri:
latencies_uri = uri_path_join(self.upload_root_uri, *tests_path, 'latencies')
else:
latencies_uri = pathlib.Path(latencies_file).as_uri()
self.insert_result(query, rps, summary, histogram, latencies_uri)
return (summary, histogram)
def get_latency_histogram(self, result, write_histogram_file):
const_true = lambda l : True
state_changes = {
'start' : {
(lambda l: 'Detailed Percentile spectrum' in l) : 'histogram_start'
},
'histogram_start': {
(lambda l: 'Value' in l and 'Percentile' in l): 'histogram_headers'
},
'histogram_headers': {
const_true: 'histogram_empty_line'
},
'histogram_empty_line' : {
const_true: 'histogram_values'
},
'histogram_values': {
(lambda l: l.strip().startswith('#')): 'histogram_summary'
},
'histogram_summary': {
(lambda l: not l.strip().startswith('#')): 'histogram_end'
}
}
state = 'start'
histogram = []
print(Fore.CYAN + "Latency histogram summary" + Style.RESET_ALL)
with open(write_histogram_file, 'w') as f:
for line in result.splitlines():
# Change the state
for (check, next_state) in state_changes[state].items():
if check(line):
state = next_state
break
if state == 'start':
continue
elif state == 'histogram_end':
break
if state == 'histogram_summary':
print(Fore.CYAN + line + Style.RESET_ALL)
if state in ['histogram_headers','histogram_values','histogram_summary']:
f.write(line+'\n')
if state == 'histogram_values':
(val, percentile, total_count, _) = line.strip().split()
histogram.append({
'percentile': float(percentile),
'latency': float(val),
'total_count': float(total_count)
})
return histogram
# The appropriate Lua env vars for execution within wrk container:
def get_lua_env(self):
return {
'LUA_PATH': '/usr/share/lua/5.1/?.lua;' +
os.path.join(self.lua_dir, '?.lua') + ';;',
'LUA_CPATH': '/usr/lib/lua/5.1/?.so;/usr/lib/x86_64-linux-gnu/lua/5.1/?.so;;'
}
def get_scripts_vol(self):
return {
os.path.join(fileLoc, 'wrk-websocket-server', 'bench_scripts'): {
'bind' : self.lua_dir,
'mode' : 'ro'
}
}
def max_rps_test(self, query):
query_str = graphql.print_ast(query)
print(Fore.GREEN + "(Compute maximum Request per second) Running wrk benchmark for query\n", query_str + Style.RESET_ALL)
self.hge.graphql_q(query_str) # Test query once for errors
bench_script = os.path.join(self.lua_dir + '/bench-wrk.lua')
graphql_url = self.hge.url + '/v1/graphql'
params = self.get_wrk2_params()
duration = 30
wrk_command = [
'wrk',
'-t', str(params['threads']),
'-c', str(params['connections']),
'-d', str(duration),
'--latency',
'-s', bench_script,
graphql_url,
query_str
]
self.docker_client = docker.from_env()
result = self.docker_client.containers.run(
self.wrk_docker_image,
detach = False,
stdout = False,
stderr = True,
command = wrk_command,
network_mode = 'host',
environment = self.get_lua_env(),
volumes = self.get_scripts_vol(),
remove = True,
user = self.get_current_user()
)
summary = json.loads(result)['summary']
# TODO explain this calculation. Why aren't we using wrk's reported 'max'? Should we call this avg_sustained_rps or something?
max_rps = round(summary['requests']/float(duration))
self.insert_max_rps_result(query, max_rps)
print("Max RPS", max_rps)
return max_rps
def get_version(self):
script = os.path.join(fileLoc, 'gen-version.sh')
return subprocess.check_output([script]).decode('ascii').strip()
def get_server_shasum(self):
script = os.path.join(fileLoc, 'get-server-sha.sh')
return subprocess.check_output([script]).decode('ascii').strip()
def extract_cpu_info(self):
self.cpu_info = cpuinfo.get_cpu_info()
for k in ['flags', 'python_version', 'hz_actual', 'hz_actual_raw']:
if self.cpu_info.get(k):
del self.cpu_info[k]
def get_results(self):
query = '''
query results {
latency: hge_bench_latest_results {
query_name
requests_per_sec
docker_image
version
latencies_uri
latency_histogram {
percentile
latency
}
}
max_rps: hge_bench_avg_query_max_rps {
query_name
docker_image
version
max_rps
}
}
'''
output = self.results_hge.graphql_q(query)
return output['data']
def set_cpu_info(self, insert_var):
cpu_key = self.cpu_info['brand'] + ' vCPUs: ' + str(self.cpu_info['count'])
insert_var['cpu']= {
'data' : {
'info': self.cpu_info,
'key': cpu_key
},
"on_conflict": {
"constraint": "cpu_info_pkey",
"update_columns": "key"
}
}
def set_query_info(self, insert_var, query):
insert_var["query"] = {
"data": {
"name" : query.name.value,
"query" : graphql.print_ast(query)
},
"on_conflict" : {
"constraint": "gql_query_query_key",
"update_columns": "query"
}
}
#TODO add executable shasum also
def set_version_info(self, insert_var):
if self.hge_docker_image:
insert_var["docker_image"] = self.hge_docker_image
else:
insert_var["version"] = self.get_version()
insert_var["server_shasum"] = self.server_shasum
insert_var['postgres_version'] = self.pg.get_server_version()
if self.scenario_name:
insert_var['scenario_name'] = self.scenario_name
def set_hge_args_env_vars(self, insert_var):
to_hide_env = ['HASURA_GRAPHQL_' + env for env in
[ 'ADMIN_SECRET', 'DATABASE_URL', 'JWT_SECRET']
]
env = { k:v for (k,v) in self.hge.get_hge_env().items() if (k.startswith('HASURA_GRAPHQL') and k not in to_hide_env) or k in ['GHCRTS'] }
args = self.hge.args
insert_var['hge_conf'] = {
'env': env,
'args': args
}
def gen_max_rps_insert_var(self, query, max_rps):
insert_var = dict()
self.set_cpu_info(insert_var)
self.set_query_info(insert_var, query)
self.set_version_info(insert_var)
self.set_hge_args_env_vars(insert_var)
insert_var['max_rps'] = max_rps
insert_var['wrk_parameters'] = self.get_wrk2_params()
return insert_var
def plot_results(self):
def open_plot_in_browser():
time.sleep(1)
webbrowser.open_new_tab('http://127.0.0.1:8050/')
threading.Thread(target=open_plot_in_browser).start()
run_dash_server(self.get_results())
# Collect info about the test environment
def gen_test_info(self, query, rps):
test_info = dict()
self.set_cpu_info(test_info)
self.set_query_info(test_info, query)
self.set_version_info(test_info)
self.set_hge_args_env_vars(test_info)
test_info["requests_per_sec"] = rps
test_info['wrk2_parameters'] = self.get_wrk2_params()
return test_info
def gen_result_insert_var(self, query, rps, summary, latency_histogram, latencies_uri):
insert_var = self.gen_test_info(query, rps)
insert_var["summary"] = summary
insert_var['latency_histogram'] = {
'data' : latency_histogram
}
insert_var['latencies_uri'] = latencies_uri
return insert_var
def insert_result(self, query, rps, summary, latency_histogram, latencies_uri):
result_var = self.gen_result_insert_var(query, rps, summary, latency_histogram, latencies_uri)
insert_query = """
mutation insertResult($result: hge_bench_results_insert_input!) {
insert_hge_bench_results(objects: [$result]){
affected_rows
}
}"""
variables = {'result': result_var}
self.results_hge.graphql_q(insert_query, variables)
def insert_max_rps_result(self, query, max_rps):
result_var = self.gen_max_rps_insert_var(query, max_rps)
insert_query = """
mutation insertMaxRps($result: hge_bench_query_max_rps_insert_input!) {
insert_hge_bench_query_max_rps(objects: [$result]){
affected_rows
}
}"""
variables = {'result': result_var}
self.results_hge.graphql_q(insert_query, variables)
def setup_results_schema(self):
if not self.results_hge_url:
self.results_hge_url = self.hge.url
self.results_hge_admin_secret = self.hge.admin_secret()
if self.results_hge_admin_secret:
results_hge_args = ['--admin-secret', self.results_hge_admin_secret]
else:
results_hge_args = []
self.results_hge = HGE(None, None, args=results_hge_args, log_file=None, url=self.results_hge_url)
results_table = {
'name' : 'results',
'schema': 'hge_bench'
}
if results_table in self.results_hge.get_all_tracked_tables():
return
schema_file = os.path.join(fileLoc, 'results_schema.yaml')
with open(schema_file) as f:
queries = yaml.safe_load(f)
self.results_hge.run_bulk(queries)
def run_query_benchmarks(self):
def get_results_root_dir(query):
if self.hge_docker_image:
ver_info = 'docker-tag-' + self.hge_docker_image.split(':')[1]
else:
ver_info = self.get_version()
query_name = query.name.value
# Store versioned runs under e.g. test_output/benchmark_runs/<hge_version>/
results_root_dir = os.path.abspath(os.path.join(self.work_dir, 'benchmark_runs'))
return os.path.join(results_root_dir, ver_info, query_name)
for query in self.queries:
try:
self.results_root_dir = get_results_root_dir(query)
max_rps = self.max_rps_test(query)
# The tests should definitely not be running very close to or higher than maximum requests per second
rps_steps = [ r for r in self.rps_steps if r < 0.6*max_rps]
print("Benchmarking queries with wrk2 for the following requests/sec", rps_steps)
for rps in rps_steps:
if rps < int(0.6*max_rps):
self.wrk2_test(query, rps)
except Exception:
print(Fore.RED + "Benchmarking Graphql Query '" + query.name.value + "' failed" + Style.RESET_ALL)
raise
def run_tests(self):
with self.graphql_engines_setup():
self.setup_results_schema()
if self.run_benchmarks:
self.run_query_benchmarks()
if not self.skip_plots:
self.plot_results()
class HGEWrkBenchArgs(HGETestSetupArgs):
def __init__(self):
self.set_arg_parse_options()
self.parse_args()
def set_arg_parse_options(self):
HGETestSetupArgs.set_arg_parse_options(self)
self.set_wrk_options()
def parse_args(self):
HGETestSetupArgs.parse_args(self)
self.parse_wrk_options()
def set_wrk_options(self):
def boolean_string(s):
s = s.lower()
if s not in {'false', 'true'}:
raise ValueError('Not a valid boolean string')
return s == 'true'
wrk_opts = self.arg_parser.add_argument_group('wrk')
wrk_opts.add_argument('--queries-file', metavar='HASURA_BENCH_QUERIES_FILE', help='Queries file for benchmarks', default='queries.graphql')
wrk_opts.add_argument('--connections', metavar='HASURA_BENCH_CONNECTIONS', help='Total number of open connections', default=50)
wrk_opts.add_argument('--duration', metavar='HASURA_BENCH_DURATION', help='Duration of tests in seconds', default=300)
wrk_opts.add_argument('--upload-root-uri', metavar='HASURA_BENCH_UPLOAD_ROOT_URI', help='The URI to which the latency results should be uploaded. Curently only s3 is supported', required=False)
wrk_opts.add_argument('--set-scenario-name', metavar='HASURA_BENCH_SCENARIO_NAME', help='Set a name for the test scenario. This will be shown in logs', required=False)
wrk_opts.add_argument('--results-hge-url', metavar='HASURA_BENCH_RESULTS_HGE_URL', help='The GraphQL engine to which the results should be uploaded', required=False)
wrk_opts.add_argument('--results-hge-admin-secret', metavar='HASURA_BENCH_RESULTS_HGE_ADMIN_SECRET', help='Admin secret of the GraphQL engine to which the results should be uploaded', required=False)
wrk_opts.add_argument('--skip-plots', help='Skip plotting', action='store_true', required=False)
wrk_opts.add_argument('--run-benchmarks', metavar='HASURA_BENCH_RUN_BENCHMARKS', help='Whether benchmarks should be run or not', default=True, type=boolean_string)
def get_s3_caller_identity(self):
return boto3.client('sts').get_caller_identity()
def parse_wrk_options(self):
self.connections, self.duration, self.graphql_queries_file, self.res_hge_url, upload_root_uri, self.res_hge_admin_secret, self.run_benchmarks, self.scenario_name = \
self.get_params([
('connections', 'HASURA_BENCH_CONNECTIONS'),
('duration', 'HASURA_BENCH_DURATION'),
('queries_file', 'HASURA_BENCH_QUERIES_FILE'),
('results_hge_url', 'HASURA_BENCH_RESULTS_HGE_URL'),
('upload_root_uri', 'HASURA_BENCH_UPLOAD_ROOT_URI'),
('results_hge_admin_secret', 'HASURA_BENCH_RESULTS_HGE_ADMIN_SECRET'),
('run_benchmarks', 'HASURA_BENCH_RUN_BENCHMARKS'),
('set_scenario_name', 'HASURA_BENCH_SCENARIO_NAME'),
])
self.upload_root_uri = None
if upload_root_uri:
p = urlparse(upload_root_uri)
if p.scheme == 's3':
# Check if aws credentials are set
self.get_s3_caller_identity()
self.upload_root_uri = upload_root_uri
self.skip_plots = self.parsed_args.skip_plots
class HGEWrkBenchWithArgs(HGEWrkBenchArgs, HGEWrkBench):
def __init__(self):
HGEWrkBenchArgs.__init__(self)
HGEWrkBench.__init__(
self,
pg_url = self.pg_url,
remote_pg_url = self.remote_pg_url,
pg_docker_image = self.pg_docker_image,
hge_url = self.hge_url,
remote_hge_url = self.remote_hge_url,
hge_docker_image = self.hge_docker_image,
hge_args = self.hge_args,
skip_stack_build = self.skip_stack_build,
graphql_queries_file = self.graphql_queries_file,
connections = self.connections,
duration = self.duration
)
if __name__ == "__main__":
bench = HGEWrkBenchWithArgs()
bench.run_tests()