# SPDX-License-Identifier: GPL-2.0-only # # This file is part of Nominatim. (https://nominatim.org) # # Copyright (C) 2022 by the Nominatim developer community. # For a full list of authors see the git log. """ Functions for setting up and importing a new Nominatim database. """ import logging import os import selectors import subprocess from pathlib import Path import psutil from psycopg2 import sql as pysql from nominatim.db.connection import connect, get_pg_env from nominatim.db.async_connection import DBConnection from nominatim.db.sql_preprocessor import SQLPreprocessor from nominatim.tools.exec_utils import run_osm2pgsql from nominatim.errors import UsageError from nominatim.version import POSTGRESQL_REQUIRED_VERSION, POSTGIS_REQUIRED_VERSION LOG = logging.getLogger() def _require_version(module, actual, expected): """ Compares the version for the given module and raises an exception if the actual version is too old. """ if actual < expected: LOG.fatal('Minimum supported version of %s is %d.%d. ' 'Found version %d.%d.', module, expected[0], expected[1], actual[0], actual[1]) raise UsageError(f'{module} is too old.') def setup_database_skeleton(dsn, rouser=None): """ Create a new database for Nominatim and populate it with the essential extensions. The function fails when the database already exists or Postgresql or PostGIS versions are too old. Uses `createdb` to create the database. If 'rouser' is given, then the function also checks that the user with that given name exists. Requires superuser rights by the caller. """ proc = subprocess.run(['createdb'], env=get_pg_env(dsn), check=False) if proc.returncode != 0: raise UsageError('Creating new database failed.') with connect(dsn) as conn: _require_version('PostgreSQL server', conn.server_version_tuple(), POSTGRESQL_REQUIRED_VERSION) if rouser is not None: with conn.cursor() as cur: cnt = cur.scalar('SELECT count(*) FROM pg_user where usename = %s', (rouser, )) if cnt == 0: LOG.fatal("Web user '%s' does not exists. Create it with:\n" "\n createuser %s", rouser, rouser) raise UsageError('Missing read-only user.') # Create extensions. with conn.cursor() as cur: cur.execute('CREATE EXTENSION IF NOT EXISTS hstore') cur.execute('CREATE EXTENSION IF NOT EXISTS postgis') conn.commit() _require_version('PostGIS', conn.postgis_version_tuple(), POSTGIS_REQUIRED_VERSION) def import_osm_data(osm_files, options, drop=False, ignore_errors=False): """ Import the given OSM files. 'options' contains the list of default settings for osm2pgsql. """ options['import_file'] = osm_files options['append'] = False options['threads'] = 1 if not options['flatnode_file'] and options['osm2pgsql_cache'] == 0: # Make some educated guesses about cache size based on the size # of the import file and the available memory. mem = psutil.virtual_memory() fsize = 0 if isinstance(osm_files, list): for fname in osm_files: fsize += os.stat(str(fname)).st_size else: fsize = os.stat(str(osm_files)).st_size options['osm2pgsql_cache'] = int(min((mem.available + mem.cached) * 0.75, fsize * 2) / 1024 / 1024) + 1 run_osm2pgsql(options) with connect(options['dsn']) as conn: if not ignore_errors: with conn.cursor() as cur: cur.execute('SELECT * FROM place LIMIT 1') if cur.rowcount == 0: raise UsageError('No data imported by osm2pgsql.') if drop: conn.drop_table('planet_osm_nodes') if drop and options['flatnode_file']: Path(options['flatnode_file']).unlink() def create_tables(conn, config, reverse_only=False): """ Create the set of basic tables. When `reverse_only` is True, then the main table for searching will be skipped and only reverse search is possible. """ sql = SQLPreprocessor(conn, config) sql.env.globals['db']['reverse_only'] = reverse_only sql.run_sql_file(conn, 'tables.sql') def create_table_triggers(conn, config): """ Create the triggers for the tables. The trigger functions must already have been imported with refresh.create_functions(). """ sql = SQLPreprocessor(conn, config) sql.run_sql_file(conn, 'table-triggers.sql') def create_partition_tables(conn, config): """ Create tables that have explicit partitioning. """ sql = SQLPreprocessor(conn, config) sql.run_sql_file(conn, 'partition-tables.src.sql') def truncate_data_tables(conn): """ Truncate all data tables to prepare for a fresh load. """ with conn.cursor() as cur: cur.execute('TRUNCATE placex') cur.execute('TRUNCATE place_addressline') cur.execute('TRUNCATE location_area') cur.execute('TRUNCATE location_area_country') cur.execute('TRUNCATE location_property_tiger') cur.execute('TRUNCATE location_property_osmline') cur.execute('TRUNCATE location_postcode') if conn.table_exists('search_name'): cur.execute('TRUNCATE search_name') cur.execute('DROP SEQUENCE IF EXISTS seq_place') cur.execute('CREATE SEQUENCE seq_place start 100000') cur.execute("""SELECT tablename FROM pg_tables WHERE tablename LIKE 'location_road_%'""") for table in [r[0] for r in list(cur)]: cur.execute('TRUNCATE ' + table) conn.commit() _COPY_COLUMNS = pysql.SQL(',').join(map(pysql.Identifier, ('osm_type', 'osm_id', 'class', 'type', 'name', 'admin_level', 'address', 'extratags', 'geometry'))) def load_data(dsn, threads): """ Copy data into the word and placex table. """ sel = selectors.DefaultSelector() # Then copy data from place to placex in chunks. place_threads = max(1, threads - 1) for imod in range(place_threads): conn = DBConnection(dsn) conn.connect() conn.perform( pysql.SQL("""INSERT INTO placex ({columns}) SELECT {columns} FROM place WHERE osm_id % {total} = {mod} AND NOT (class='place' and (type='houses' or type='postcode')) AND ST_IsValid(geometry) """).format(columns=_COPY_COLUMNS, total=pysql.Literal(place_threads), mod=pysql.Literal(imod))) sel.register(conn, selectors.EVENT_READ, conn) # Address interpolations go into another table. conn = DBConnection(dsn) conn.connect() conn.perform("""INSERT INTO location_property_osmline (osm_id, address, linegeo) SELECT osm_id, address, geometry FROM place WHERE class='place' and type='houses' and osm_type='W' and ST_GeometryType(geometry) = 'ST_LineString' """) sel.register(conn, selectors.EVENT_READ, conn) # Now wait for all of them to finish. todo = place_threads + 1 while todo > 0: for key, _ in sel.select(1): conn = key.data sel.unregister(conn) conn.wait() conn.close() todo -= 1 print('.', end='', flush=True) print('\n') with connect(dsn) as conn: with conn.cursor() as cur: cur.execute('ANALYSE') def create_search_indices(conn, config, drop=False): """ Create tables that have explicit partitioning. """ # If index creation failed and left an index invalid, they need to be # cleaned out first, so that the script recreates them. with conn.cursor() as cur: cur.execute("""SELECT relname FROM pg_class, pg_index WHERE pg_index.indisvalid = false AND pg_index.indexrelid = pg_class.oid""") bad_indices = [row[0] for row in list(cur)] for idx in bad_indices: LOG.info("Drop invalid index %s.", idx) cur.execute('DROP INDEX "{}"'.format(idx)) conn.commit() sql = SQLPreprocessor(conn, config) sql.run_sql_file(conn, 'indices.sql', drop=drop)