Merge pull request #3510 from lonvia/indexing-precompute-count

Indexing: precompute counts of affected rows
This commit is contained in:
Sarah Hoffmann 2024-08-12 23:55:19 +02:00 committed by GitHub
commit 043d52821c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -7,7 +7,7 @@
"""
Main work horse for indexing (computing addresses) the database.
"""
from typing import cast, List, Any
from typing import cast, List, Any, Optional
import logging
import time
@ -83,9 +83,30 @@ class Indexer:
LOG.warning("Starting indexing boundaries using %s threads",
self.num_threads)
minrank = max(minrank, 4)
maxrank = min(maxrank, 25)
# Precompute number of rows to process for all rows
with connect(self.dsn) as conn:
hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")
if hstore_info is None:
raise RuntimeError('Hstore extension is requested but not installed.')
psycopg.types.hstore.register_hstore(hstore_info)
with conn.cursor() as cur:
cur = conn.execute(""" SELECT rank_search, count(*)
FROM placex
WHERE rank_search between %s and %s
AND class = 'boundary' and type = 'administrative'
AND indexed_status > 0
GROUP BY rank_search""",
(minrank, maxrank))
total_tuples = {row.rank_search: row.count for row in cur}
with self.tokenizer.name_analyzer() as analyzer:
for rank in range(max(minrank, 4), min(maxrank, 26)):
total += await self._index(runners.BoundaryRunner(rank, analyzer))
for rank in range(minrank, maxrank + 1):
total += await self._index(runners.BoundaryRunner(rank, analyzer),
total_tuples=total_tuples.get(rank, 0))
return total
@ -101,6 +122,23 @@ class Indexer:
LOG.warning("Starting indexing rank (%i to %i) using %i threads",
minrank, maxrank, self.num_threads)
# Precompute number of rows to process for all rows
with connect(self.dsn) as conn:
hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")
if hstore_info is None:
raise RuntimeError('Hstore extension is requested but not installed.')
psycopg.types.hstore.register_hstore(hstore_info)
with conn.cursor() as cur:
cur = conn.execute(""" SELECT rank_address, count(*)
FROM placex
WHERE rank_address between %s and %s
AND indexed_status > 0
GROUP BY rank_address""",
(minrank, maxrank))
total_tuples = {row.rank_address: row.count for row in cur}
with self.tokenizer.name_analyzer() as analyzer:
for rank in range(max(1, minrank), maxrank + 1):
if rank >= 30:
@ -109,11 +147,12 @@ class Indexer:
batch = 5
else:
batch = 1
total += await self._index(runners.RankRunner(rank, analyzer), batch)
total += await self._index(runners.RankRunner(rank, analyzer),
batch=batch, total_tuples=total_tuples.get(rank, 0))
if maxrank == 30:
total += await self._index(runners.RankRunner(0, analyzer))
total += await self._index(runners.InterpolationRunner(analyzer), 20)
total += await self._index(runners.InterpolationRunner(analyzer), batch=20)
return total
@ -123,7 +162,7 @@ class Indexer:
"""
LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
return await self._index(runners.PostcodeRunner(), 20)
return await self._index(runners.PostcodeRunner(), batch=20)
def update_status_table(self) -> None:
@ -135,14 +174,20 @@ class Indexer:
conn.commit()
async def _index(self, runner: runners.Runner, batch: int = 1) -> int:
async def _index(self, runner: runners.Runner, batch: int = 1,
total_tuples: Optional[int] = None) -> int:
""" Index a single rank or table. `runner` describes the SQL to use
for indexing. `batch` describes the number of objects that
should be processed with a single SQL statement
should be processed with a single SQL statement.
`total_tuples` may contain the total number of rows to process.
When not supplied, the value will be computed using the
approriate runner function.
"""
LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
total_tuples = self._prepare_indexing(runner)
if total_tuples is None:
total_tuples = self._prepare_indexing(runner)
progress = ProgressLogger(runner.name(), total_tuples)