diff --git a/nominatim/db/async_connection.py b/nominatim/db/async_connection.py index 361fe075..a4f55496 100644 --- a/nominatim/db/async_connection.py +++ b/nominatim/db/async_connection.py @@ -48,14 +48,14 @@ class DBConnection: """ A single non-blocking database connection. """ - def __init__(self, dsn): + def __init__(self, dsn, cursor_factory=None): self.current_query = None self.current_params = None self.dsn = dsn self.conn = None self.cursor = None - self.connect() + self.connect(cursor_factory=cursor_factory) def close(self): """ Close all open connections. Does not wait for pending requests. @@ -66,7 +66,7 @@ class DBConnection: self.conn = None - def connect(self): + def connect(self, cursor_factory=None): """ (Re)connect to the database. Creates an asynchronous connection with JIT and parallel processing disabled. If a connection was already open, it is closed and a new connection established. @@ -79,7 +79,7 @@ class DBConnection: self.conn = psycopg2.connect(**{'dsn' : self.dsn, 'async' : True}) self.wait() - self.cursor = self.conn.cursor() + self.cursor = self.conn.cursor(cursor_factory=cursor_factory) # Disable JIT and parallel workers as they are known to cause problems. # Update pg_settings instead of using SET because it does not yield # errors on older versions of Postgres where the settings are not diff --git a/nominatim/indexer/indexer.py b/nominatim/indexer/indexer.py index 41535af8..d685e83a 100644 --- a/nominatim/indexer/indexer.py +++ b/nominatim/indexer/indexer.py @@ -3,6 +3,7 @@ Main work horse for indexing (computing addresses) the database. """ import logging import select +import time import psycopg2.extras @@ -183,6 +184,8 @@ class Indexer: total_tuples = cur.scalar(runner.sql_count_objects()) LOG.debug("Total number of rows: %i", total_tuples) + # need to fetch those manually because register_hstore cannot + # fetch them on an asynchronous connection below. hstore_oid = cur.scalar("SELECT 'hstore'::regtype::oid") hstore_array_oid = cur.scalar("SELECT 'hstore[]'::regtype::oid") @@ -190,11 +193,14 @@ class Indexer: progress = ProgressLogger(runner.name(), total_tuples) + fetcher_wait = 0 + pool_wait = 0 + if total_tuples > 0: with conn.cursor(name='places') as cur: cur.execute(runner.sql_get_objects()) - fetcher = DBConnection(self.dsn) + fetcher = DBConnection(self.dsn, cursor_factory=psycopg2.extras.DictCursor) psycopg2.extras.register_hstore(fetcher.conn, oid=hstore_oid, array_oid=hstore_array_oid) @@ -203,7 +209,9 @@ class Indexer: places = self._fetch_next_batch(cur, fetcher, runner) while places is not None: if not places: + t0 = time.time() fetcher.wait() + fetcher_wait += time.time() - t0 places = fetcher.cursor.fetchall() # asynchronously get the next batch @@ -211,7 +219,9 @@ class Indexer: # And insert the curent batch for idx in range(0, len(places), batch): + t0 = time.time() worker = pool.next_free_worker() + pool_wait += time.time() - t0 part = places[idx:idx+batch] LOG.debug("Processing places: %s", str(part)) runner.index_places(worker, part) @@ -227,10 +237,11 @@ class Indexer: conn.commit() progress.done() + LOG.warning("Wait time: fetcher: {}s, pool: {}s".format(fetcher_wait, pool_wait)) def _fetch_next_batch(self, cur, fetcher, runner): - ids = cur.fetchmany(1000) + ids = cur.fetchmany(100) if not ids: return None diff --git a/nominatim/indexer/runners.py b/nominatim/indexer/runners.py index 75429fe4..459f8004 100644 --- a/nominatim/indexer/runners.py +++ b/nominatim/indexer/runners.py @@ -11,7 +11,7 @@ import psycopg2.extras class AbstractPlacexRunner: """ Returns SQL commands for indexing of the placex table. """ - SELECT_SQL = 'SELECT place_id, (placex_prepare_update(placex)).* FROM placex' + SELECT_SQL = 'SELECT place_id FROM placex' def __init__(self, rank, analyzer): self.rank = rank @@ -28,6 +28,12 @@ class AbstractPlacexRunner: """.format(','.join(["(%s, %s::hstore, %s::jsonb)"] * num_places)) + def get_place_details(self, worker, ids): + worker.perform("""SELECT place_id, (placex_prepare_update(placex)).* + FROM placex WHERE place_id IN %s""", + (tuple((p[0] for p in ids)), )) + + def index_places(self, worker, places): values = [] for place in places: