fix timeout use for replication timeout

The timeout parameter is no longer taken into account since
pyosmium switched to the requests library. This adds the parameter
back.
This commit is contained in:
Sarah Hoffmann 2022-11-08 21:45:36 +01:00
parent 30f526c943
commit 1fdcec985a
2 changed files with 37 additions and 6 deletions

View File

@ -148,7 +148,7 @@ class UpdateReplication:
while True:
with connect(args.config.get_libpq_dsn()) as conn:
start = dt.datetime.now(dt.timezone.utc)
state = replication.update(conn, params)
state = replication.update(conn, params, socket_timeout=args.socket_timeout)
if state is not replication.UpdateState.NO_CHANGES:
status.log_status(conn, start, 'import')
batchdate, _, _ = status.get_status(conn)

View File

@ -7,13 +7,16 @@
"""
Functions for updating a database from a replication source.
"""
from typing import ContextManager, MutableMapping, Any, Generator, cast
from typing import ContextManager, MutableMapping, Any, Generator, cast, Iterator
from contextlib import contextmanager
import datetime as dt
from enum import Enum
import logging
import time
import types
import urllib.request as urlrequest
import requests
from nominatim.db import status
from nominatim.db.connection import Connection
from nominatim.tools.exec_utils import run_osm2pgsql
@ -22,6 +25,7 @@ from nominatim.errors import UsageError
try:
from osmium.replication.server import ReplicationServer
from osmium import WriteHandler
from osmium import version as pyo_version
except ImportError as exc:
logging.getLogger().critical("pyosmium not installed. Replication functions not available.\n"
"To install pyosmium via pip: pip3 install osmium")
@ -86,7 +90,8 @@ class UpdateState(Enum):
NO_CHANGES = 3
def update(conn: Connection, options: MutableMapping[str, Any]) -> UpdateState:
def update(conn: Connection, options: MutableMapping[str, Any],
socket_timeout: int = 60) -> UpdateState:
""" Update database from the next batch of data. Returns the state of
updates according to `UpdateState`.
"""
@ -114,7 +119,7 @@ def update(conn: Connection, options: MutableMapping[str, Any]) -> UpdateState:
options['import_file'].unlink()
# Read updates into file.
with _make_replication_server(options['base_url']) as repl:
with _make_replication_server(options['base_url'], socket_timeout) as repl:
outhandler = WriteHandler(str(options['import_file']))
endseq = repl.apply_diffs(outhandler, startseq + 1,
max_size=options['max_diff_size'] * 1024)
@ -136,14 +141,40 @@ def update(conn: Connection, options: MutableMapping[str, Any]) -> UpdateState:
return UpdateState.UP_TO_DATE
def _make_replication_server(url: str) -> ContextManager[ReplicationServer]:
def _make_replication_server(url: str, timeout: int) -> ContextManager[ReplicationServer]:
""" Returns a ReplicationServer in form of a context manager.
Creates a light wrapper around older versions of pyosmium that did
not support the context manager interface.
"""
if hasattr(ReplicationServer, '__enter__'):
return cast(ContextManager[ReplicationServer], ReplicationServer(url))
# Patches the open_url function for pyosmium >= 3.2
# where the socket timeout is no longer respected.
def patched_open_url(self: ReplicationServer, url: urlrequest.Request) -> Any:
""" Download a resource from the given URL and return a byte sequence
of the content.
"""
get_params = {
'headers': {"User-Agent" : f"Nominatim (pyosmium/{pyo_version.pyosmium_release})"},
'timeout': timeout or None,
'stream': True
}
if self.session is not None:
return self.session.get(url.get_full_url(), **get_params)
@contextmanager
def _get_url_with_session() -> Iterator[requests.Response]:
with requests.Session() as session:
request = session.get(url.get_full_url(), **get_params) # type: ignore
yield request
return _get_url_with_session()
repl = ReplicationServer(url)
repl.open_url = types.MethodType(patched_open_url, repl)
return cast(ContextManager[ReplicationServer], repl)
@contextmanager
def get_cm() -> Generator[ReplicationServer, None, None]: