from behave import * import logging import os import psycopg2 import psycopg2.extras import subprocess import tempfile from nose.tools import * # for assert functions from sys import version_info as python_version logger = logging.getLogger(__name__) userconfig = { 'BUILDDIR' : os.path.join(os.path.split(__file__)[0], "../../build"), 'REMOVE_TEMPLATE' : False, 'KEEP_TEST_DB' : False, 'DB_HOST' : None, 'DB_PORT' : None, 'DB_USER' : None, 'DB_PASS' : None, 'TEMPLATE_DB' : 'test_template_nominatim', 'TEST_DB' : 'test_nominatim', 'API_TEST_DB' : 'test_api_nominatim', 'TEST_SETTINGS_FILE' : '/tmp/nominatim_settings.php', 'SERVER_MODULE_PATH' : None, 'PHPCOV' : False, # set to output directory to enable code coverage } use_step_matcher("re") class NominatimEnvironment(object): """ Collects all functions for the execution of Nominatim functions. """ def __init__(self, config): self.build_dir = os.path.abspath(config['BUILDDIR']) self.src_dir = os.path.abspath(os.path.join(os.path.split(__file__)[0], "../..")) self.db_host = config['DB_HOST'] self.db_port = config['DB_PORT'] self.db_user = config['DB_USER'] self.db_pass = config['DB_PASS'] self.template_db = config['TEMPLATE_DB'] self.test_db = config['TEST_DB'] self.api_test_db = config['API_TEST_DB'] self.server_module_path = config['SERVER_MODULE_PATH'] self.local_settings_file = config['TEST_SETTINGS_FILE'] self.reuse_template = not config['REMOVE_TEMPLATE'] self.keep_scenario_db = config['KEEP_TEST_DB'] self.code_coverage_path = config['PHPCOV'] self.code_coverage_id = 1 os.environ['NOMINATIM_SETTINGS'] = self.local_settings_file self.template_db_done = False def connect_database(self, dbname): dbargs = {'database': dbname} if self.db_host: dbargs['host'] = self.db_host if self.db_port: dbargs['port'] = self.db_port if self.db_user: dbargs['user'] = self.db_user if self.db_pass: dbargs['password'] = self.db_pass conn = psycopg2.connect(**dbargs) return conn def next_code_coverage_file(self): fn = os.path.join(self.code_coverage_path, "%06d.cov" % self.code_coverage_id) self.code_coverage_id += 1 return fn def write_nominatim_config(self, dbname): f = open(self.local_settings_file, 'w') # https://secure.php.net/manual/en/ref.pdo-pgsql.connection.php f.write("') fd.flush() self.run_setup_script('import-data', 'ignore-errors', 'create-functions', 'create-tables', 'create-partition-tables', 'create-partition-functions', 'load-data', 'create-search-indices', osm_file=fd.name, osm2pgsql_cache='200') except: self.db_drop_database(self.template_db) raise def setup_api_db(self, context): self.write_nominatim_config(self.api_test_db) def setup_unknown_db(self, context): self.write_nominatim_config('UNKNOWN_DATABASE_NAME') def setup_db(self, context): self.setup_template_db() self.write_nominatim_config(self.test_db) conn = self.connect_database(self.template_db) conn.set_isolation_level(0) cur = conn.cursor() cur.execute('DROP DATABASE IF EXISTS %s' % (self.test_db, )) cur.execute('CREATE DATABASE %s TEMPLATE = %s' % (self.test_db, self.template_db)) conn.close() context.db = self.connect_database(self.test_db) if python_version[0] < 3: psycopg2.extras.register_hstore(context.db, globally=False, unicode=True) else: psycopg2.extras.register_hstore(context.db, globally=False) def teardown_db(self, context): if 'db' in context: context.db.close() if not self.keep_scenario_db: self.db_drop_database(self.test_db) def run_setup_script(self, *args, **kwargs): if self.server_module_path: kwargs = dict(kwargs) kwargs['module_path'] = self.server_module_path self.run_nominatim_script('setup', *args, **kwargs) def run_update_script(self, *args, **kwargs): self.run_nominatim_script('update', *args, **kwargs) def run_nominatim_script(self, script, *args, **kwargs): cmd = ['/usr/bin/env', 'php', '-Cq'] cmd.append(os.path.join(self.build_dir, 'utils', '%s.php' % script)) cmd.extend(['--%s' % x for x in args]) for k, v in kwargs.items(): cmd.extend(('--' + k.replace('_', '-'), str(v))) proc = subprocess.Popen(cmd, cwd=self.build_dir, stdout=subprocess.PIPE, stderr=subprocess.PIPE) (outp, outerr) = proc.communicate() outerr = outerr.decode('utf-8').replace('\\n', '\n') logger.debug("run_nominatim_script: %s\n%s\n%s" % (cmd, outp, outerr)) assert (proc.returncode == 0), "Script '%s' failed:\n%s\n%s\n" % (script, outp, outerr) class OSMDataFactory(object): def __init__(self): scriptpath = os.path.dirname(os.path.abspath(__file__)) self.scene_path = os.environ.get('SCENE_PATH', os.path.join(scriptpath, '..', 'scenes', 'data')) self.scene_cache = {} self.clear_grid() def parse_geometry(self, geom, scene): if geom.find(':') >= 0: return "ST_SetSRID(%s, 4326)" % self.get_scene_geometry(scene, geom) if geom.find(',') < 0: out = "POINT(%s)" % self.mk_wkt_point(geom) elif geom.find('(') < 0: line = ','.join([self.mk_wkt_point(x) for x in geom.split(',')]) out = "LINESTRING(%s)" % line else: inner = geom.strip('() ') line = ','.join([self.mk_wkt_point(x) for x in inner.split(',')]) out = "POLYGON((%s))" % line return "ST_SetSRID('%s'::geometry, 4326)" % out def mk_wkt_point(self, point): geom = point.strip() if geom.find(' ') >= 0: return geom else: pt = self.grid_node(int(geom)) assert_is_not_none(pt, "Point not found in grid") return "%f %f" % pt def get_scene_geometry(self, default_scene, name): geoms = [] for obj in name.split('+'): oname = obj.strip() if oname.startswith(':'): assert_is_not_none(default_scene, "You need to set a scene") defscene = self.load_scene(default_scene) wkt = defscene[oname[1:]] else: scene, obj = oname.split(':', 2) scene_geoms = self.load_scene(scene) wkt = scene_geoms[obj] geoms.append("'%s'::geometry" % wkt) if len(geoms) == 1: return geoms[0] else: return 'ST_LineMerge(ST_Collect(ARRAY[%s]))' % ','.join(geoms) def load_scene(self, name): if name in self.scene_cache: return self.scene_cache[name] scene = {} with open(os.path.join(self.scene_path, "%s.wkt" % name), 'r') as fd: for line in fd: if line.strip(): obj, wkt = line.split('|', 2) scene[obj.strip()] = wkt.strip() self.scene_cache[name] = scene return scene def clear_grid(self): self.grid = {} def add_grid_node(self, nodeid, x, y): self.grid[nodeid] = (x, y) def grid_node(self, nodeid): return self.grid.get(nodeid) def before_all(context): # logging setup context.config.setup_logging() # set up -D options for k,v in userconfig.items(): context.config.userdata.setdefault(k, v) logging.debug('User config: %s' %(str(context.config.userdata))) # Nominatim test setup context.nominatim = NominatimEnvironment(context.config.userdata) context.osm = OSMDataFactory() def after_all(context): context.nominatim.cleanup() def before_scenario(context, scenario): if 'DB' in context.tags: context.nominatim.setup_db(context) elif 'APIDB' in context.tags: context.nominatim.setup_api_db(context) elif 'UNKNOWNDB' in context.tags: context.nominatim.setup_unknown_db(context) context.scene = None def after_scenario(context, scenario): if 'DB' in context.tags: context.nominatim.teardown_db(context)