Merge pull request #1923 from lonvia/split-indexing-for-boundries

Rework indexing order of places
This commit is contained in:
Sarah Hoffmann 2020-08-20 15:03:29 +02:00 committed by GitHub
commit 5b20fa7e38
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 227 additions and 129 deletions

View File

@ -8,6 +8,23 @@ SQL statements should be executed from the PostgreSQL commandline. Execute
## 3.5.0 -> master
### Change order during indexing
When reindexing places during updates, there is now a different order used
which needs a different database index. Create it with the following SQL command:
```sql
CREATE INDEX idx_placex_pendingsector_rank_address
ON placex USING BTREE (rank_address, geometry_sector) where indexed_status > 0;
```
You can then drop the old index with:
```sql
DROP INDEX idx_placex_pendingsector
```
### Switching to dotenv
As part of the work changing the configuration format, the configuration for

View File

@ -566,19 +566,27 @@ class SetupFunctions
info('Index ranks 0 - 4');
$oCmd = (clone $oBaseCmd)->addParams('--maxrank', 4);
echo $oCmd->escapedCmd();
$iStatus = $oCmd->run();
if ($iStatus != 0) {
fail('error status ' . $iStatus . ' running nominatim!');
}
if (!$bIndexNoanalyse) $this->pgsqlRunScript('ANALYSE');
info('Index administrative boundaries');
$oCmd = (clone $oBaseCmd)->addParams('-b');
$iStatus = $oCmd->run();
if ($iStatus != 0) {
fail('error status ' . $iStatus . ' running nominatim!');
}
info('Index ranks 5 - 25');
$oCmd = (clone $oBaseCmd)->addParams('--minrank', 5, '--maxrank', 25);
$iStatus = $oCmd->run();
if ($iStatus != 0) {
fail('error status ' . $iStatus . ' running nominatim!');
}
if (!$bIndexNoanalyse) $this->pgsqlRunScript('ANALYSE');
info('Index ranks 26 - 30');

112
nominatim/indexer/db.py Normal file
View File

@ -0,0 +1,112 @@
# SPDX-License-Identifier: GPL-2.0-only
#
# This file is part of Nominatim.
# Copyright (C) 2020 Sarah Hoffmann
import logging
import psycopg2
from psycopg2.extras import wait_select
log = logging.getLogger()
def make_connection(options, asynchronous=False):
params = {'dbname' : options.dbname,
'user' : options.user,
'password' : options.password,
'host' : options.host,
'port' : options.port,
'async' : asynchronous}
return psycopg2.connect(**params)
class DBConnection(object):
""" A single non-blocking database connection.
"""
def __init__(self, options):
self.current_query = None
self.current_params = None
self.options = options
self.conn = None
self.connect()
def connect(self):
""" (Re)connect to the database. Creates an asynchronous connection
with JIT and parallel processing disabled. If a connection was
already open, it is closed and a new connection established.
The caller must ensure that no query is pending before reconnecting.
"""
if self.conn is not None:
self.cursor.close()
self.conn.close()
self.conn = make_connection(self.options, asynchronous=True)
self.wait()
self.cursor = self.conn.cursor()
# Disable JIT and parallel workers as they are known to cause problems.
# Update pg_settings instead of using SET because it does not yield
# errors on older versions of Postgres where the settings are not
# implemented.
self.perform(
""" UPDATE pg_settings SET setting = -1 WHERE name = 'jit_above_cost';
UPDATE pg_settings SET setting = 0
WHERE name = 'max_parallel_workers_per_gather';""")
self.wait()
def wait(self):
""" Block until any pending operation is done.
"""
while True:
try:
wait_select(self.conn)
self.current_query = None
return
except psycopg2.extensions.TransactionRollbackError as e:
if e.pgcode == '40P01':
log.info("Deadlock detected (params = {}), retry."
.format(self.current_params))
self.cursor.execute(self.current_query, self.current_params)
else:
raise
except psycopg2.errors.DeadlockDetected:
self.cursor.execute(self.current_query, self.current_params)
def perform(self, sql, args=None):
""" Send SQL query to the server. Returns immediately without
blocking.
"""
self.current_query = sql
self.current_params = args
self.cursor.execute(sql, args)
def fileno(self):
""" File descriptor to wait for. (Makes this class select()able.)
"""
return self.conn.fileno()
def is_done(self):
""" Check if the connection is available for a new query.
Also checks if the previous query has run into a deadlock.
If so, then the previous query is repeated.
"""
if self.current_query is None:
return True
try:
if self.conn.poll() == psycopg2.extensions.POLL_OK:
self.current_query = None
return True
except psycopg2.extensions.TransactionRollbackError as e:
if e.pgcode == '40P01':
log.info("Deadlock detected (params = {}), retry.".format(self.current_params))
self.cursor.execute(self.current_query, self.current_params)
else:
raise
except psycopg2.errors.DeadlockDetected:
self.cursor.execute(self.current_query, self.current_params)
return False

View File

@ -28,25 +28,13 @@ import sys
import re
import getpass
from datetime import datetime
import psycopg2
from psycopg2.extras import wait_select
import select
from indexer.progress import ProgressLogger
from indexer.db import DBConnection, make_connection
log = logging.getLogger()
def make_connection(options, asynchronous=False):
params = {'dbname' : options.dbname,
'user' : options.user,
'password' : options.password,
'host' : options.host,
'port' : options.port,
'async' : asynchronous}
return psycopg2.connect(**params)
class RankRunner(object):
""" Returns SQL commands for indexing one rank within the placex table.
"""
@ -59,12 +47,12 @@ class RankRunner(object):
def sql_count_objects(self):
return """SELECT count(*) FROM placex
WHERE rank_search = {} and indexed_status > 0
WHERE rank_address = {} and indexed_status > 0
""".format(self.rank)
def sql_get_objects(self):
return """SELECT place_id FROM placex
WHERE indexed_status > 0 and rank_search = {}
WHERE indexed_status > 0 and rank_address = {}
ORDER BY geometry_sector""".format(self.rank)
def sql_index_place(self, ids):
@ -94,113 +82,62 @@ class InterpolationRunner(object):
SET indexed_status = 0 WHERE place_id IN ({})"""\
.format(','.join((str(i) for i in ids)))
class DBConnection(object):
""" A single non-blocking database connection.
class BoundaryRunner(object):
""" Returns SQL commands for indexing the administrative boundaries
of a certain rank.
"""
def __init__(self, options):
self.current_query = None
self.current_params = None
def __init__(self, rank):
self.rank = rank
self.conn = None
self.connect()
def name(self):
return "boundaries rank {}".format(self.rank)
def connect(self):
if self.conn is not None:
self.cursor.close()
self.conn.close()
def sql_count_objects(self):
return """SELECT count(*) FROM placex
WHERE indexed_status > 0
AND rank_search = {}
AND class = 'boundary' and type = 'administrative'""".format(self.rank)
self.conn = make_connection(options, asynchronous=True)
self.wait()
self.cursor = self.conn.cursor()
# Disable JIT and parallel workers as they are known to cause problems.
# Update pg_settings instead of using SET because it does not yield
# errors on older versions of Postgres where the settings are not
# implemented.
self.perform(
""" UPDATE pg_settings SET setting = -1 WHERE name = 'jit_above_cost';
UPDATE pg_settings SET setting = 0
WHERE name = 'max_parallel_workers_per_gather';""")
self.wait()
def wait(self):
""" Block until any pending operation is done.
"""
while True:
try:
wait_select(self.conn)
self.current_query = None
return
except psycopg2.extensions.TransactionRollbackError as e:
if e.pgcode == '40P01':
log.info("Deadlock detected (params = {}), retry."
.format(self.current_params))
self.cursor.execute(self.current_query, self.current_params)
else:
raise
except psycopg2.errors.DeadlockDetected:
self.cursor.execute(self.current_query, self.current_params)
def perform(self, sql, args=None):
""" Send SQL query to the server. Returns immediately without
blocking.
"""
self.current_query = sql
self.current_params = args
self.cursor.execute(sql, args)
def fileno(self):
""" File descriptor to wait for. (Makes this class select()able.)
"""
return self.conn.fileno()
def is_done(self):
""" Check if the connection is available for a new query.
Also checks if the previous query has run into a deadlock.
If so, then the previous query is repeated.
"""
if self.current_query is None:
return True
try:
if self.conn.poll() == psycopg2.extensions.POLL_OK:
self.current_query = None
return True
except psycopg2.extensions.TransactionRollbackError as e:
if e.pgcode == '40P01':
log.info("Deadlock detected (params = {}), retry.".format(self.current_params))
self.cursor.execute(self.current_query, self.current_params)
else:
raise
except psycopg2.errors.DeadlockDetected:
self.cursor.execute(self.current_query, self.current_params)
return False
def sql_get_objects(self):
return """SELECT place_id FROM placex
WHERE indexed_status > 0 and rank_search = {}
and class = 'boundary' and type = 'administrative'
ORDER BY partition, admin_level""".format(self.rank)
def sql_index_place(self, ids):
return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
.format(','.join((str(i) for i in ids)))
class Indexer(object):
""" Main indexing routine.
"""
def __init__(self, options):
self.minrank = max(0, options.minrank)
self.minrank = max(1, options.minrank)
self.maxrank = min(30, options.maxrank)
self.conn = make_connection(options)
self.threads = [DBConnection(options) for i in range(options.threads)]
def run(self):
""" Run indexing over the entire database.
def index_boundaries(self):
log.warning("Starting indexing boundaries using {} threads".format(
len(self.threads)))
for rank in range(max(self.minrank, 5), min(self.maxrank, 26)):
self.index(BoundaryRunner(rank))
def index_by_rank(self):
""" Run classic indexing by rank.
"""
log.warning("Starting indexing rank ({} to {}) using {} threads".format(
self.minrank, self.maxrank, len(self.threads)))
for rank in range(self.minrank, self.maxrank):
for rank in range(max(1, self.minrank), self.maxrank):
self.index(RankRunner(rank))
if self.maxrank == 30:
self.index(RankRunner(0), 20)
self.index(InterpolationRunner(), 20)
self.index(RankRunner(self.maxrank), 20)
@ -220,27 +157,28 @@ class Indexer(object):
cur.close()
next_thread = self.find_free_thread()
progress = ProgressLogger(obj.name(), total_tuples)
cur = self.conn.cursor(name='places')
cur.execute(obj.sql_get_objects())
if total_tuples > 0:
cur = self.conn.cursor(name='places')
cur.execute(obj.sql_get_objects())
while True:
places = [p[0] for p in cur.fetchmany(batch)]
if len(places) == 0:
break
next_thread = self.find_free_thread()
while True:
places = [p[0] for p in cur.fetchmany(batch)]
if len(places) == 0:
break
log.debug("Processing places: {}".format(places))
thread = next(next_thread)
log.debug("Processing places: {}".format(places))
thread = next(next_thread)
thread.perform(obj.sql_index_place(places))
progress.add(len(places))
thread.perform(obj.sql_index_place(places))
progress.add(len(places))
cur.close()
cur.close()
for t in self.threads:
t.wait()
for t in self.threads:
t.wait()
progress.done()
@ -296,6 +234,9 @@ def nominatim_arg_parser():
p.add_argument('-P', '--port',
dest='port', action='store',
help='PostgreSQL server port')
p.add_argument('-b', '--boundary-only',
dest='boundary_only', action='store_true',
help='Only index administrative boundaries (ignores min/maxrank).')
p.add_argument('-r', '--minrank',
dest='minrank', type=int, metavar='RANK', default=0,
help='Minimum/starting rank.')
@ -323,4 +264,7 @@ if __name__ == '__main__':
password = getpass.getpass("Database password: ")
options.password = password
Indexer(options).run()
if options.boundary_only:
Indexer(options).index_boundaries()
else:
Indexer(options).index_by_rank()

View File

@ -207,16 +207,22 @@ CREATE OR REPLACE FUNCTION addr_ids_from_name(lookup_word TEXT)
AS $$
DECLARE
lookup_token TEXT;
id INTEGER;
return_word_id INTEGER[];
BEGIN
lookup_token := make_standard_name(lookup_word);
SELECT array_agg(word_id) FROM word
WHERE word_token = lookup_token and class is null and type is null
INTO return_word_id;
IF return_word_id IS NULL THEN
id := nextval('seq_word');
INSERT INTO word VALUES (id, lookup_token, null, null, null, null, 0);
return_word_id = ARRAY[id];
END IF;
RETURN return_word_id;
END;
$$
LANGUAGE plpgsql STABLE;
LANGUAGE plpgsql;
-- Normalize a string and look up its name ids (full words).

View File

@ -581,6 +581,11 @@ BEGIN
RETURN NEW;
END IF;
-- Speed up searches - just use the centroid of the feature
-- cheaper but less acurate
NEW.centroid := ST_PointOnSurface(NEW.geometry);
--DEBUG: RAISE WARNING 'Computing preliminary centroid at %',ST_AsText(NEW.centroid);
-- recompute the ranks, they might change when linking changes
SELECT * INTO NEW.rank_search, NEW.rank_address
FROM compute_place_rank(NEW.country_code,
@ -591,8 +596,8 @@ BEGIN
(NEW.extratags->'capital') = 'yes',
NEW.address->'postcode');
-- We must always increase the address level relative to the admin boundary.
IF NEW.class = 'boundary' and NEW.type = 'administrative' THEN
parent_address_level := get_parent_address_level(NEW.geometry, NEW.admin_level);
IF NEW.class = 'boundary' and NEW.type = 'administrative' and NEW.osm_type = 'R' THEN
parent_address_level := get_parent_address_level(NEW.centroid, NEW.admin_level);
IF parent_address_level >= NEW.rank_address THEN
IF parent_address_level >= 24 THEN
NEW.rank_address := 25;
@ -632,11 +637,6 @@ BEGIN
END IF;
END IF;
-- Speed up searches - just use the centroid of the feature
-- cheaper but less acurate
NEW.centroid := ST_PointOnSurface(NEW.geometry);
--DEBUG: RAISE WARNING 'Computing preliminary centroid at %',ST_AsText(NEW.centroid);
NEW.postcode := null;
-- recalculate country and partition

View File

@ -1,7 +1,7 @@
-- Indices used only during search and update.
-- These indices are created only after the indexing process is done.
CREATE INDEX CONCURRENTLY idx_placex_pendingsector ON placex USING BTREE (rank_search,geometry_sector) {ts:address-index} where indexed_status > 0;
CREATE INDEX CONCURRENTLY idx_placex_pendingsector ON placex USING BTREE (rank_address,geometry_sector) {ts:address-index} where indexed_status > 0;
CREATE INDEX CONCURRENTLY idx_location_area_country_place_id ON location_area_country USING BTREE (place_id) {ts:address-index};

View File

@ -39,13 +39,13 @@ Feature: Creation of search terms
| object | nameaddress_vector |
| W1 | bonn, new york, smalltown |
Scenario: A known addr:* tag is not added if the name is unknown
Scenario: A known addr:* tag is added even if the name is unknown
Given the scene roads-with-pois
And the places
| osm | class | type | name | addr+city | geometry |
| W1 | highway | residential | Road | Nandu | :w-north |
When importing
Then search_name contains not
Then search_name contains
| object | nameaddress_vector |
| W1 | nandu |

View File

@ -278,9 +278,11 @@ if ($aResult['recompute-word-counts']) {
if ($aResult['index']) {
$oCmd = (clone $oIndexCmd)
->addParams('--minrank', $aResult['index-rank']);
->addParams('--minrank', $aResult['index-rank'], '-b');
$oCmd->run();
// echo $oCmd->escapedCmd()."\n";
$oCmd = (clone $oIndexCmd)
->addParams('--minrank', $aResult['index-rank']);
$oCmd->run();
$oDB->exec('update import_status set indexed = true');
@ -421,9 +423,18 @@ if ($aResult['import-osmosis'] || $aResult['import-osmosis-all']) {
// Index file
if (!$aResult['no-index']) {
$oThisIndexCmd = clone($oIndexCmd);
$fCMDStartTime = time();
$oThisIndexCmd = clone($oIndexCmd);
$oThisIndexCmd->addParams('-b');
echo $oThisIndexCmd->escapedCmd()."\n";
$iErrorLevel = $oThisIndexCmd->run();
if ($iErrorLevel) {
echo "Error: $iErrorLevel\n";
exit($iErrorLevel);
}
$oThisIndexCmd = clone($oIndexCmd);
echo $oThisIndexCmd->escapedCmd()."\n";
$iErrorLevel = $oThisIndexCmd->run();
if ($iErrorLevel) {