update unit tests for adapted abbreviation code

This commit is contained in:
Sarah Hoffmann 2021-06-09 15:07:36 +02:00
parent 1bd9f455fc
commit 2f6e4edcdb
7 changed files with 78 additions and 51 deletions

View File

@ -1,7 +1,7 @@
[MASTER]
extension-pkg-whitelist=osmium
ignored-modules=icu
ignored-modules=icu,datrie
[MESSAGES CONTROL]

View File

@ -58,6 +58,9 @@ class ICUNameProcessorRules:
class ICUNameProcessor:
""" Collects the different transformation rules for normalisation of names
and provides the functions to aply the transformations.
"""
def __init__(self, rules):
self.normalizer = Transliterator.createFromRules("icu_normalization",

View File

@ -2,11 +2,11 @@
Helper class to create ICU rules from a configuration file.
"""
import io
import yaml
import logging
from collections import defaultdict
import itertools
import yaml
from icu import Transliterator
from nominatim.errors import UsageError
@ -20,6 +20,8 @@ class ICURuleLoader:
def __init__(self, configfile):
self.configfile = configfile
self.compound_suffixes = set()
self.abbreviations = defaultdict()
if configfile.suffix == '.yaml':
self._load_from_yaml()
@ -42,7 +44,7 @@ class ICURuleLoader:
suffixes.add(suffix)
suffixes.update(self.abbreviations.get(suffix, []))
for suffix in sorted(suffixes, key=lambda x:len(x), reverse=True):
for suffix in sorted(suffixes, key=len, reverse=True):
rules.write("'{0} ' > ' {0} ';".format(suffix))
# Finally add transliteration.
@ -85,7 +87,7 @@ class ICURuleLoader:
synonyms[abbr + ' '].add(' ' + abbr + ' ')
# sort the resulting list by descending length (longer matches are prefered).
sorted_keys = sorted(synonyms.keys(), key=lambda x: len(x), reverse=True)
sorted_keys = sorted(synonyms.keys(), key=len, reverse=True)
return [(k, list(synonyms[k])) for k in sorted_keys]

View File

@ -3,16 +3,13 @@ Tokenizer implementing normalisation as used before Nominatim 4 but using
libICU instead of the PostgreSQL module.
"""
from collections import Counter
import functools
import io
import itertools
import json
import logging
import re
from textwrap import dedent
from pathlib import Path
from icu import Transliterator
import psycopg2.extras
from nominatim.db.connection import connect
@ -103,9 +100,7 @@ class LegacyICUTokenizer:
"""
self.init_from_project()
if self.normalization is None\
or self.transliteration is None\
or self.abbreviations is None:
if self.naming_rules is None:
return "Configuration for tokenizer 'legacy_icu' are missing."
return None
@ -320,40 +315,64 @@ class LegacyICUNameAnalyzer:
for label, cls, typ, oper in cur:
existing_phrases.add((label, cls, typ, oper or '-'))
to_add = norm_phrases - existing_phrases
to_delete = existing_phrases - norm_phrases
if to_add:
copystr = io.StringIO()
for word, cls, typ, oper in to_add:
term = self.name_processor.get_search_normalized(word)
if term:
copystr.write(word)
copystr.write('\t ')
copystr.write(term)
copystr.write('\t')
copystr.write(cls)
copystr.write('\t')
copystr.write(typ)
copystr.write('\t')
copystr.write(oper if oper in ('in', 'near') else '\\N')
copystr.write('\t0\n')
copystr.seek(0)
cur.copy_from(copystr, 'word',
columns=['word', 'word_token', 'class', 'type',
'operator', 'search_name_count'])
if to_delete and should_replace:
psycopg2.extras.execute_values(
cur,
""" DELETE FROM word USING (VALUES %s) as v(name, in_class, in_type, op)
WHERE word = name and class = in_class and type = in_type
and ((op = '-' and operator is null) or op = operator)""",
to_delete)
added = self._add_special_phrases(cur, norm_phrases, existing_phrases)
if should_replace:
deleted = self._remove_special_phrases(cur, norm_phrases,
existing_phrases)
else:
deleted = 0
LOG.info("Total phrases: %s. Added: %s. Deleted: %s",
len(norm_phrases), len(to_add), len(to_delete))
len(norm_phrases), added, deleted)
def _add_special_phrases(self, cursor, new_phrases, existing_phrases):
""" Add all phrases to the database that are not yet there.
"""
to_add = new_phrases - existing_phrases
copystr = io.StringIO()
added = 0
for word, cls, typ, oper in to_add:
term = self.name_processor.get_search_normalized(word)
if term:
copystr.write(word)
copystr.write('\t ')
copystr.write(term)
copystr.write('\t')
copystr.write(cls)
copystr.write('\t')
copystr.write(typ)
copystr.write('\t')
copystr.write(oper if oper in ('in', 'near') else '\\N')
copystr.write('\t0\n')
added += 1
if copystr.tell() > 0:
copystr.seek(0)
cursor.copy_from(copystr, 'word',
columns=['word', 'word_token', 'class', 'type',
'operator', 'search_name_count'])
return added
def _remove_special_phrases(self, cursor, new_phrases, existing_phrases):
""" Remove all phrases from the databse that are no longer in the
new phrase list.
"""
to_delete = existing_phrases - new_phrases
if to_delete:
psycopg2.extras.execute_values(
cursor,
""" DELETE FROM word USING (VALUES %s) as v(name, in_class, in_type, op)
WHERE word = name and class = in_class and type = in_type
and ((op = '-' and operator is null) or op = operator)""",
to_delete)
return len(to_delete)
def add_country_names(self, country_code, names):
@ -451,7 +470,8 @@ class LegacyICUNameAnalyzer:
return full_tokens, partial_tokens
def _compute_full_names(self, names):
@staticmethod
def _compute_full_names(names):
""" Return the set of all full name word ids to be used with the
given dictionary of names.
"""
@ -534,7 +554,7 @@ class _TokenInfo:
self.data['hnr'] = ';'.join(hnrs)
def add_street(self, fulls, partials):
def add_street(self, fulls, _):
""" Add addr:street match terms.
"""
if fulls:

View File

@ -260,7 +260,9 @@ def test_update_special_phrase_modify(analyzer, word_table, make_standard_name):
def test_add_country_names(analyzer, word_table, make_standard_name):
analyzer.add_country_names('de', ['Germany', 'Deutschland', 'germany'])
analyzer.add_country_names('de', {'name': 'Germany',
'name:de': 'Deutschland',
'short_name': 'germany'})
assert word_table.get_country() \
== {('de', ' #germany#'),
@ -272,7 +274,7 @@ def test_add_more_country_names(analyzer, word_table, make_standard_name):
word_table.add_country('it', ' #italy#')
word_table.add_country('it', ' #itala#')
analyzer.add_country_names('it', ['Italy', 'IT'])
analyzer.add_country_names('it', {'name': 'Italy', 'ref': 'IT'})
assert word_table.get_country() \
== {('fr', ' #france#'),

View File

@ -212,14 +212,14 @@ def test_update_postcodes_from_db_add_and_remove(analyzer, table_factory, word_t
def test_update_special_phrase_empty_table(analyzer, word_table):
with analyzer() as anl:
anl.update_special_phrases([
("König bei", "amenity", "royal", "near"),
("Könige", "amenity", "royal", "-"),
("König bei", "amenity", "royal", "near"),
("Könige ", "amenity", "royal", "-"),
("street", "highway", "primary", "in")
], True)
assert word_table.get_special() \
== {(' KÖNIG BEI', 'könig bei', 'amenity', 'royal', 'near'),
(' KÖNIGE', 'könige', 'amenity', 'royal', None),
== {(' KÖNIG BEI', 'König bei', 'amenity', 'royal', 'near'),
(' KÖNIGE', 'Könige', 'amenity', 'royal', None),
(' STREET', 'street', 'highway', 'primary', 'in')}

View File

@ -180,7 +180,7 @@ def test_create_country_names(temp_db_with_extensions, temp_db_conn, temp_db_cur
assert len(tokenizer.analyser_cache['countries']) == 2
result_set = {k: set(v) for k, v in tokenizer.analyser_cache['countries']}
result_set = {k: set(v.values()) for k, v in tokenizer.analyser_cache['countries']}
if languages:
assert result_set == {'us' : set(('us', 'us1', 'United States')),