Merge pull request #2285 from lonvia/split-indexer-code

Rework indexer code
This commit is contained in:
Sarah Hoffmann 2021-04-20 15:34:14 +02:00 committed by GitHub
commit 696c50459f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 239 additions and 223 deletions

View File

@ -1,129 +1,77 @@
"""
Main work horse for indexing (computing addresses) the database.
"""
# pylint: disable=C0111
import logging
import select
import psycopg2
from nominatim.indexer.progress import ProgressLogger
from nominatim.indexer import runners
from nominatim.db.async_connection import DBConnection
from nominatim.db.connection import connect
LOG = logging.getLogger()
class RankRunner:
""" Returns SQL commands for indexing one rank within the placex table.
class WorkerPool:
""" A pool of asynchronous database connections.
The pool may be used as a context manager.
"""
REOPEN_CONNECTIONS_AFTER = 100000
def __init__(self, rank):
self.rank = rank
def name(self):
return "rank {}".format(self.rank)
def sql_count_objects(self):
return """SELECT count(*) FROM placex
WHERE rank_address = {} and indexed_status > 0
""".format(self.rank)
def sql_get_objects(self):
return """SELECT place_id FROM placex
WHERE indexed_status > 0 and rank_address = {}
ORDER BY geometry_sector""".format(self.rank)
@staticmethod
def sql_index_place(ids):
return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
.format(','.join((str(i) for i in ids)))
def __init__(self, dsn, pool_size):
self.threads = [DBConnection(dsn) for _ in range(pool_size)]
self.free_workers = self._yield_free_worker()
class InterpolationRunner:
""" Returns SQL commands for indexing the address interpolation table
location_property_osmline.
"""
def finish_all(self):
""" Wait for all connection to finish.
"""
for thread in self.threads:
while not thread.is_done():
thread.wait()
@staticmethod
def name():
return "interpolation lines (location_property_osmline)"
self.free_workers = self._yield_free_worker()
@staticmethod
def sql_count_objects():
return """SELECT count(*) FROM location_property_osmline
WHERE indexed_status > 0"""
@staticmethod
def sql_get_objects():
return """SELECT place_id FROM location_property_osmline
WHERE indexed_status > 0
ORDER BY geometry_sector"""
@staticmethod
def sql_index_place(ids):
return """UPDATE location_property_osmline
SET indexed_status = 0 WHERE place_id IN ({})
""".format(','.join((str(i) for i in ids)))
class BoundaryRunner:
""" Returns SQL commands for indexing the administrative boundaries
of a certain rank.
"""
def __init__(self, rank):
self.rank = rank
def name(self):
return "boundaries rank {}".format(self.rank)
def sql_count_objects(self):
return """SELECT count(*) FROM placex
WHERE indexed_status > 0
AND rank_search = {}
AND class = 'boundary' and type = 'administrative'
""".format(self.rank)
def sql_get_objects(self):
return """SELECT place_id FROM placex
WHERE indexed_status > 0 and rank_search = {}
and class = 'boundary' and type = 'administrative'
ORDER BY partition, admin_level
""".format(self.rank)
@staticmethod
def sql_index_place(ids):
return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
.format(','.join((str(i) for i in ids)))
def close(self):
""" Close all connections and clear the pool.
"""
for thread in self.threads:
thread.close()
self.threads = []
self.free_workers = None
class PostcodeRunner:
""" Provides the SQL commands for indexing the location_postcode table.
"""
@staticmethod
def name():
return "postcodes (location_postcode)"
@staticmethod
def sql_count_objects():
return 'SELECT count(*) FROM location_postcode WHERE indexed_status > 0'
@staticmethod
def sql_get_objects():
return """SELECT place_id FROM location_postcode
WHERE indexed_status > 0
ORDER BY country_code, postcode"""
@staticmethod
def sql_index_place(ids):
return """UPDATE location_postcode SET indexed_status = 0
WHERE place_id IN ({})
""".format(','.join((str(i) for i in ids)))
def next_free_worker(self):
""" Get the next free connection.
"""
return next(self.free_workers)
def _analyse_db_if(conn, condition):
if condition:
with conn.cursor() as cur:
cur.execute('ANALYSE')
def _yield_free_worker(self):
ready = self.threads
command_stat = 0
while True:
for thread in ready:
if thread.is_done():
command_stat += 1
yield thread
if command_stat > self.REOPEN_CONNECTIONS_AFTER:
for thread in self.threads:
while not thread.is_done():
thread.wait()
thread.connect()
ready = self.threads
else:
_, ready, _ = select.select([], self.threads, [])
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
class Indexer:
@ -133,23 +81,6 @@ class Indexer:
def __init__(self, dsn, num_threads):
self.dsn = dsn
self.num_threads = num_threads
self.conn = None
self.threads = []
def _setup_connections(self):
self.conn = psycopg2.connect(self.dsn)
self.threads = [DBConnection(self.dsn) for _ in range(self.num_threads)]
def _close_connections(self):
if self.conn:
self.conn.close()
self.conn = None
for thread in self.threads:
thread.close()
self.threads = []
def index_full(self, analyse=True):
@ -158,26 +89,31 @@ class Indexer:
database will be analysed at the appropriate places to
ensure that database statistics are updated.
"""
conn = psycopg2.connect(self.dsn)
conn.autocommit = True
with connect(self.dsn) as conn:
conn.autocommit = True
if analyse:
def _analyze():
with conn.cursor() as cur:
cur.execute('ANALYZE')
else:
def _analyze():
pass
try:
self.index_by_rank(0, 4)
_analyse_db_if(conn, analyse)
_analyze()
self.index_boundaries(0, 30)
_analyse_db_if(conn, analyse)
_analyze()
self.index_by_rank(5, 25)
_analyse_db_if(conn, analyse)
_analyze()
self.index_by_rank(26, 30)
_analyse_db_if(conn, analyse)
_analyze()
self.index_postcodes()
_analyse_db_if(conn, analyse)
finally:
conn.close()
_analyze()
def index_boundaries(self, minrank, maxrank):
@ -186,13 +122,8 @@ class Indexer:
LOG.warning("Starting indexing boundaries using %s threads",
self.num_threads)
self._setup_connections()
try:
for rank in range(max(minrank, 4), min(maxrank, 26)):
self.index(BoundaryRunner(rank))
finally:
self._close_connections()
for rank in range(max(minrank, 4), min(maxrank, 26)):
self._index(runners.BoundaryRunner(rank))
def index_by_rank(self, minrank, maxrank):
""" Index all entries of placex in the given rank range (inclusive)
@ -205,20 +136,15 @@ class Indexer:
LOG.warning("Starting indexing rank (%i to %i) using %i threads",
minrank, maxrank, self.num_threads)
self._setup_connections()
for rank in range(max(1, minrank), maxrank):
self._index(runners.RankRunner(rank))
try:
for rank in range(max(1, minrank), maxrank):
self.index(RankRunner(rank))
if maxrank == 30:
self.index(RankRunner(0))
self.index(InterpolationRunner(), 20)
self.index(RankRunner(30), 20)
else:
self.index(RankRunner(maxrank))
finally:
self._close_connections()
if maxrank == 30:
self._index(runners.RankRunner(0))
self._index(runners.InterpolationRunner(), 20)
self._index(runners.RankRunner(30), 20)
else:
self._index(runners.RankRunner(maxrank))
def index_postcodes(self):
@ -226,89 +152,52 @@ class Indexer:
"""
LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
self._setup_connections()
self._index(runners.PostcodeRunner(), 20)
try:
self.index(PostcodeRunner(), 20)
finally:
self._close_connections()
def update_status_table(self):
""" Update the status in the status table to 'indexed'.
"""
conn = psycopg2.connect(self.dsn)
try:
with connect(self.dsn) as conn:
with conn.cursor() as cur:
cur.execute('UPDATE import_status SET indexed = true')
conn.commit()
finally:
conn.close()
def index(self, obj, batch=1):
""" Index a single rank or table. `obj` describes the SQL to use
def _index(self, runner, batch=1):
""" Index a single rank or table. `runner` describes the SQL to use
for indexing. `batch` describes the number of objects that
should be processed with a single SQL statement
"""
LOG.warning("Starting %s (using batch size %s)", obj.name(), batch)
LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
cur = self.conn.cursor()
cur.execute(obj.sql_count_objects())
with connect(self.dsn) as conn:
with conn.cursor() as cur:
total_tuples = cur.scalar(runner.sql_count_objects())
LOG.debug("Total number of rows: %i", total_tuples)
total_tuples = cur.fetchone()[0]
LOG.debug("Total number of rows: %i", total_tuples)
conn.commit()
cur.close()
progress = ProgressLogger(runner.name(), total_tuples)
progress = ProgressLogger(obj.name(), total_tuples)
if total_tuples > 0:
with conn.cursor(name='places') as cur:
cur.execute(runner.sql_get_objects())
if total_tuples > 0:
cur = self.conn.cursor(name='places')
cur.execute(obj.sql_get_objects())
with WorkerPool(self.dsn, self.num_threads) as pool:
while True:
places = [p[0] for p in cur.fetchmany(batch)]
if not places:
break
next_thread = self.find_free_thread()
while True:
places = [p[0] for p in cur.fetchmany(batch)]
if not places:
break
LOG.debug("Processing places: %s", str(places))
worker = pool.next_free_worker()
LOG.debug("Processing places: %s", str(places))
thread = next(next_thread)
worker.perform(runner.sql_index_place(places))
progress.add(len(places))
thread.perform(obj.sql_index_place(places))
progress.add(len(places))
pool.finish_all()
cur.close()
for thread in self.threads:
thread.wait()
conn.commit()
progress.done()
def find_free_thread(self):
""" Generator that returns the next connection that is free for
sending a query.
"""
ready = self.threads
command_stat = 0
while True:
for thread in ready:
if thread.is_done():
command_stat += 1
yield thread
# refresh the connections occasionaly to avoid potential
# memory leaks in Postgresql.
if command_stat > 100000:
for thread in self.threads:
while not thread.is_done():
thread.wait()
thread.connect()
command_stat = 0
ready = self.threads
else:
ready, _, _ = select.select(self.threads, [], [])
assert False, "Unreachable code"

View File

@ -0,0 +1,113 @@
"""
Mix-ins that provide the actual commands for the indexer for various indexing
tasks.
"""
# pylint: disable=C0111
class RankRunner:
""" Returns SQL commands for indexing one rank within the placex table.
"""
def __init__(self, rank):
self.rank = rank
def name(self):
return "rank {}".format(self.rank)
def sql_count_objects(self):
return """SELECT count(*) FROM placex
WHERE rank_address = {} and indexed_status > 0
""".format(self.rank)
def sql_get_objects(self):
return """SELECT place_id FROM placex
WHERE indexed_status > 0 and rank_address = {}
ORDER BY geometry_sector""".format(self.rank)
@staticmethod
def sql_index_place(ids):
return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
.format(','.join((str(i) for i in ids)))
class BoundaryRunner:
""" Returns SQL commands for indexing the administrative boundaries
of a certain rank.
"""
def __init__(self, rank):
self.rank = rank
def name(self):
return "boundaries rank {}".format(self.rank)
def sql_count_objects(self):
return """SELECT count(*) FROM placex
WHERE indexed_status > 0
AND rank_search = {}
AND class = 'boundary' and type = 'administrative'
""".format(self.rank)
def sql_get_objects(self):
return """SELECT place_id FROM placex
WHERE indexed_status > 0 and rank_search = {}
and class = 'boundary' and type = 'administrative'
ORDER BY partition, admin_level
""".format(self.rank)
@staticmethod
def sql_index_place(ids):
return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
.format(','.join((str(i) for i in ids)))
class InterpolationRunner:
""" Returns SQL commands for indexing the address interpolation table
location_property_osmline.
"""
@staticmethod
def name():
return "interpolation lines (location_property_osmline)"
@staticmethod
def sql_count_objects():
return """SELECT count(*) FROM location_property_osmline
WHERE indexed_status > 0"""
@staticmethod
def sql_get_objects():
return """SELECT place_id FROM location_property_osmline
WHERE indexed_status > 0
ORDER BY geometry_sector"""
@staticmethod
def sql_index_place(ids):
return """UPDATE location_property_osmline
SET indexed_status = 0 WHERE place_id IN ({})
""".format(','.join((str(i) for i in ids)))
class PostcodeRunner:
""" Provides the SQL commands for indexing the location_postcode table.
"""
@staticmethod
def name():
return "postcodes (location_postcode)"
@staticmethod
def sql_count_objects():
return 'SELECT count(*) FROM location_postcode WHERE indexed_status > 0'
@staticmethod
def sql_get_objects():
return """SELECT place_id FROM location_postcode
WHERE indexed_status > 0
ORDER BY country_code, postcode"""
@staticmethod
def sql_index_place(ids):
return """UPDATE location_postcode SET indexed_status = 0
WHERE place_id IN ({})
""".format(','.join((str(i) for i in ids)))

View File

@ -5,7 +5,7 @@ import itertools
import psycopg2
import pytest
from nominatim.indexer.indexer import Indexer
from nominatim.indexer import indexer
class IndexerTestDB:
@ -111,7 +111,7 @@ def test_index_all_by_rank(test_db, threads):
assert 31 == test_db.placex_unindexed()
assert 1 == test_db.osmline_unindexed()
idx = Indexer('dbname=test_nominatim_python_unittest', threads)
idx = indexer.Indexer('dbname=test_nominatim_python_unittest', threads)
idx.index_by_rank(0, 30)
assert 0 == test_db.placex_unindexed()
@ -150,7 +150,7 @@ def test_index_partial_without_30(test_db, threads):
assert 31 == test_db.placex_unindexed()
assert 1 == test_db.osmline_unindexed()
idx = Indexer('dbname=test_nominatim_python_unittest', threads)
idx = indexer.Indexer('dbname=test_nominatim_python_unittest', threads)
idx.index_by_rank(4, 15)
assert 19 == test_db.placex_unindexed()
@ -170,7 +170,7 @@ def test_index_partial_with_30(test_db, threads):
assert 31 == test_db.placex_unindexed()
assert 1 == test_db.osmline_unindexed()
idx = Indexer('dbname=test_nominatim_python_unittest', threads)
idx = indexer.Indexer('dbname=test_nominatim_python_unittest', threads)
idx.index_by_rank(28, 30)
assert 27 == test_db.placex_unindexed()
@ -191,7 +191,7 @@ def test_index_boundaries(test_db, threads):
assert 37 == test_db.placex_unindexed()
assert 1 == test_db.osmline_unindexed()
idx = Indexer('dbname=test_nominatim_python_unittest', threads)
idx = indexer.Indexer('dbname=test_nominatim_python_unittest', threads)
idx.index_boundaries(0, 30)
assert 31 == test_db.placex_unindexed()
@ -209,14 +209,15 @@ def test_index_postcodes(test_db, threads):
for postcode in range(32000, 33000):
test_db.add_postcode('us', postcode)
idx = Indexer('dbname=test_nominatim_python_unittest', threads)
idx = indexer.Indexer('dbname=test_nominatim_python_unittest', threads)
idx.index_postcodes()
assert 0 == test_db.scalar("""SELECT count(*) FROM location_postcode
WHERE indexed_status != 0""")
def test_index_full(test_db):
@pytest.mark.parametrize("analyse", [True, False])
def test_index_full(test_db, analyse):
for rank in range(4, 10):
test_db.add_admin(rank_address=rank, rank_search=rank)
for rank in range(31):
@ -225,10 +226,23 @@ def test_index_full(test_db):
for postcode in range(1000):
test_db.add_postcode('de', postcode)
idx = Indexer('dbname=test_nominatim_python_unittest', 4)
idx.index_full()
idx = indexer.Indexer('dbname=test_nominatim_python_unittest', 4)
idx.index_full(analyse=analyse)
assert 0 == test_db.placex_unindexed()
assert 0 == test_db.osmline_unindexed()
assert 0 == test_db.scalar("""SELECT count(*) FROM location_postcode
WHERE indexed_status != 0""")
@pytest.mark.parametrize("threads", [1, 15])
def test_index_reopen_connection(test_db, threads, monkeypatch):
monkeypatch.setattr(indexer.WorkerPool, "REOPEN_CONNECTIONS_AFTER", 15)
for _ in range(1000):
test_db.add_place(rank_address=30, rank_search=30)
idx = indexer.Indexer('dbname=test_nominatim_python_unittest', threads)
idx.index_by_rank(28, 30)
assert 0 == test_db.placex_unindexed()