switch from osmosis to pyosmium for updates

Pyosmium comes with convenient functions for finding the
right state and does not require external files for
rembering the state. Updates can now conveniently
set up by simply running ./utils/update.php --init-updates
and state is kept directly in the import_status table.

This change requires an update in the database schema.
Run the following to update:

ALTER TABLE import_status ADD COLUMN sequence_id integer;
ALTER TABLE import_status ADD COLUMN indexed boolean;
ALTER TABLE import_osmosis_log ADD COLUMN batchseq integer;
This commit is contained in:
Sarah Hoffmann 2017-05-25 16:26:09 +02:00
parent 6830b1229e
commit bd4b1b2d08
5 changed files with 133 additions and 201 deletions

View File

@ -33,6 +33,18 @@ function getCacheMemoryMB()
return (int)($aMatches[1]/1024);
}
function getDatabaseDate(&$oDB)
{
// Find the newest node in the DB
$iLastOSMID = $oDB->getOne("select max(osm_id) from place where osm_type = 'N'");
// Lookup the timestamp that node was created
$sLastNodeURL = 'http://www.openstreetmap.org/api/0.6/node/'.$iLastOSMID."/1";
$sLastNodeXML = file_get_contents($sLastNodeURL);
preg_match('#timestamp="(([0-9]{4})-([0-9]{2})-([0-9]{2})T([0-9]{2}):([0-9]{2}):([0-9]{2})Z)"#', $sLastNodeXML, $aLastNodeDate);
return $aLastNodeDate[1];
}
function bySearchRank($a, $b)
{

View File

@ -41,7 +41,7 @@ if (isset($_GET['debug']) && $_GET['debug']) @define('CONST_Debug', true);
// Paths
@define('CONST_ExtraDataPath', CONST_BasePath.'/data');
@define('CONST_Osm2pgsql_Binary', CONST_InstallPath.'/osm2pgsql/osm2pgsql');
@define('CONST_Osmosis_Binary', '@OSMOSIS_PATH@');
@define('CONST_Pyosmium_Binary', 'pyosmium-get-changes');
@define('CONST_Tiger_Data_Path', CONST_ExtraDataPath.'/tiger');
@define('CONST_Wikipedia_Data_Path', CONST_ExtraDataPath);
@ -67,7 +67,7 @@ if (isset($_GET['debug']) && $_GET['debug']) @define('CONST_Debug', true);
// Replication settings
@define('CONST_Replication_Url', 'http://planet.openstreetmap.org/replication/minute');
@define('CONST_Replication_MaxInterval', '3600');
@define('CONST_Replication_Max_Diff_size', '30'); // MB of update data to download per batch
@define('CONST_Replication_Update_Interval', '60'); // How often upstream publishes diffs
@define('CONST_Replication_Recheck_Interval', '60'); // How long to sleep if no update found yet

View File

@ -1,12 +1,15 @@
drop table if exists import_status;
CREATE TABLE import_status (
lastimportdate timestamp NOT NULL
lastimportdate timestamp NOT NULL,
sequence_id integer,
indexed boolean
);
GRANT SELECT ON import_status TO "{www-user}" ;
drop table if exists import_osmosis_log;
CREATE TABLE import_osmosis_log (
batchend timestamp,
batchseq integer,
batchsize integer,
starttime timestamp,
endtime timestamp,

View File

@ -404,6 +404,12 @@ if ($aCMDResult['load-data'] || $aCMDResult['all']) {
echo "\n";
echo "Reanalysing database...\n";
pgsqlRunScript('ANALYSE');
$sDatabaseDate = getDatabaseDate($oDB);
pg_query($oDB->connection, 'TRUNCATE import_status');
$sSQL = "INSERT INTO import_status (lastimportdate) VALUES('".$sDatabaseDate."')";
pg_query($oDB->connection, $sSQL);
echo "Latest data imported from $sDatabaseDate.\n";
}
if ($aCMDResult['import-tiger-data']) {
@ -500,83 +506,9 @@ if ($aCMDResult['calculate-postcodes'] || $aCMDResult['all']) {
}
}
if ($aCMDResult['osmosis-init'] || ($aCMDResult['all'] && !$aCMDResult['drop'])) { // no use doing osmosis-init when dropping update tables
if ($aCMDResult['osmosis-init']) {
$bDidSomething = true;
$oDB =& getDB();
if (!file_exists(CONST_Osmosis_Binary)) {
echo "Please download osmosis.\nIf it is already installed, check the path in your local settings (settings/local.php) file.\n";
if (!$aCMDResult['all']) {
fail("osmosis not found in '".CONST_Osmosis_Binary."'");
}
} else {
if (file_exists(CONST_InstallPath.'/settings/configuration.txt')) {
echo "settings/configuration.txt already exists\n";
} else {
passthru(CONST_Osmosis_Binary.' --read-replication-interval-init '.CONST_InstallPath.'/settings');
// update osmosis configuration.txt with our settings
passthru("sed -i 's!baseUrl=.*!baseUrl=".CONST_Replication_Url."!' ".CONST_InstallPath.'/settings/configuration.txt');
passthru("sed -i 's:maxInterval = .*:maxInterval = ".CONST_Replication_MaxInterval.":' ".CONST_InstallPath.'/settings/configuration.txt');
}
// Find the last node in the DB
$iLastOSMID = $oDB->getOne("select max(osm_id) from place where osm_type = 'N'");
// Lookup the timestamp that node was created (less 3 hours for margin for changsets to be closed)
$sLastNodeURL = 'http://www.openstreetmap.org/api/0.6/node/'.$iLastOSMID."/1";
$sLastNodeXML = file_get_contents($sLastNodeURL);
preg_match('#timestamp="(([0-9]{4})-([0-9]{2})-([0-9]{2})T([0-9]{2}):([0-9]{2}):([0-9]{2})Z)"#', $sLastNodeXML, $aLastNodeDate);
$iLastNodeTimestamp = strtotime($aLastNodeDate[1]) - (3*60*60);
// Search for the correct state file - uses file timestamps so need to sort by date descending
$sRepURL = CONST_Replication_Url."/";
$sRep = file_get_contents($sRepURL."?C=M;O=D;F=1");
// download.geofabrik.de: <a href="000/">000/</a></td><td align="right">26-Feb-2013 11:53 </td>
// planet.openstreetmap.org: <a href="273/">273/</a> 2013-03-11 07:41 -
preg_match_all('#<a href="[0-9]{3}/">([0-9]{3}/)</a>\s*([-0-9a-zA-Z]+ [0-9]{2}:[0-9]{2})#', $sRep, $aRepMatches, PREG_SET_ORDER);
if ($aRepMatches) {
$aPrevRepMatch = false;
foreach ($aRepMatches as $aRepMatch) {
if (strtotime($aRepMatch[2]) < $iLastNodeTimestamp) break;
$aPrevRepMatch = $aRepMatch;
}
if ($aPrevRepMatch) $aRepMatch = $aPrevRepMatch;
$sRepURL .= $aRepMatch[1];
$sRep = file_get_contents($sRepURL."?C=M;O=D;F=1");
preg_match_all('#<a href="[0-9]{3}/">([0-9]{3}/)</a>\s*([-0-9a-zA-Z]+ [0-9]{2}:[0-9]{2})#', $sRep, $aRepMatches, PREG_SET_ORDER);
$aPrevRepMatch = false;
foreach ($aRepMatches as $aRepMatch) {
if (strtotime($aRepMatch[2]) < $iLastNodeTimestamp) break;
$aPrevRepMatch = $aRepMatch;
}
if ($aPrevRepMatch) $aRepMatch = $aPrevRepMatch;
$sRepURL .= $aRepMatch[1];
$sRep = file_get_contents($sRepURL."?C=M;O=D;F=1");
preg_match_all('#<a href="[0-9]{3}.state.txt">([0-9]{3}).state.txt</a>\s*([-0-9a-zA-Z]+ [0-9]{2}:[0-9]{2})#', $sRep, $aRepMatches, PREG_SET_ORDER);
$aPrevRepMatch = false;
foreach ($aRepMatches as $aRepMatch) {
if (strtotime($aRepMatch[2]) < $iLastNodeTimestamp) break;
$aPrevRepMatch = $aRepMatch;
}
if ($aPrevRepMatch) $aRepMatch = $aPrevRepMatch;
$sRepURL .= $aRepMatch[1].'.state.txt';
echo "Getting state file: $sRepURL\n";
$sStateFile = file_get_contents($sRepURL);
if (!$sStateFile || strlen($sStateFile) > 1000) fail("unable to obtain state file");
file_put_contents(CONST_InstallPath.'/settings/state.txt', $sStateFile);
echo "Updating DB status\n";
pg_query($oDB->connection, 'TRUNCATE import_status');
$sSQL = "INSERT INTO import_status VALUES('".$aRepMatch[2]."')";
pg_query($oDB->connection, $sSQL);
} else {
if (!$aCMDResult['all']) {
fail("Cannot read state file directory.");
}
}
}
echo "Command 'osmosis-init' no longer available, please use utils/update.php --init-updates.\n";
}
if ($aCMDResult['index'] || $aCMDResult['all']) {

View File

@ -12,8 +12,9 @@ $aCMDOptions
array('quiet', 'q', 0, 1, 0, 0, 'bool', 'Quiet output'),
array('verbose', 'v', 0, 1, 0, 0, 'bool', 'Verbose output'),
array('import-osmosis', '', 0, 1, 0, 0, 'bool', 'Import using osmosis'),
array('import-osmosis-all', '', 0, 1, 0, 0, 'bool', 'Import using osmosis forever'),
array('init-updates', '', 0, 1, 0, 0, 'bool', 'Set up database for updating'),
array('import-osmosis', '', 0, 1, 0, 0, 'bool', 'Import updates once'),
array('import-osmosis-all', '', 0, 1, 0, 0, 'bool', 'Import updates forever'),
array('no-npi', '', 0, 1, 0, 0, 'bool', '(obsolate)'),
array('no-index', '', 0, 1, 0, 0, 'bool', 'Do not index the new data'),
@ -57,10 +58,39 @@ if (!is_null(CONST_Osm2pgsql_Flatnode_File)) {
$sOsm2pgsqlCmd .= ' --flat-nodes '.CONST_Osm2pgsql_Flatnode_File;
}
if ($aResult['init-updates']) {
$sSetup = CONST_InstallPath.'/utils/setup.php';
$iRet = -1;
passthru($sSetup.' --create-functions --enable-diff-updates', $iRet);
if ($iRet != 0) {
fail('Error running setup script');
}
if (isset($aResult['import-diff'])) {
// import diff directly (e.g. from osmosis --rri)
$sNextFile = $aResult['import-diff'];
$sDatabaseDate = getDatabaseDate($oDB);
$sWindBack = strftime('%Y-%m-%dT%H:%M:%SZ',
strtotime($sDatabaseDate) - (3*60*60));
// get the appropriate state id
$aOutput = 0;
exec(CONST_Pyosmium_Get_Changes.' -D '.$sWindBack.' --server '.CONST_Replication_Url,
$aOutput, $iRet);
if ($iRet != 0) {
fail('Error running pyosmium tools');
}
pg_query($oDB->connection, 'TRUNCATE import_status');
$sSQL = "INSERT INTO import_status (lastimportdate, sequence_id, indexed) VALUES('";
$sSQL .= $sDatabaseDate."',".$aOutput[0].", true)";
if (!pg_query($oDB->connection, $sSQL)) {
fail("Could not enter sequence into database.");
}
echo "Done. Database updates will start at sequence $aOutput[0] ($sWindBack)\n";
}
if (isset($aResult['import-diff']) || isset($aResult['import-file'])) {
// import diffs and files directly (e.g. from osmosis --rri)
$sNextFile = isset($aResult['import-diff']) ? $aResult['import-diff'] : $aResult['import-file'];
if (!file_exists($sNextFile)) {
fail("Cannot open $sNextFile\n");
}
@ -79,16 +109,6 @@ if (isset($aResult['import-diff'])) {
$sTemporaryFile = CONST_BasePath.'/data/osmosischange.osc';
$bHaveDiff = false;
if (isset($aResult['import-file']) && $aResult['import-file']) {
$bHaveDiff = true;
$sCMD = CONST_Osmosis_Binary.' --read-xml \''.$aResult['import-file'].'\' --read-empty --derive-change --write-xml-change '.$sTemporaryFile;
echo $sCMD."\n";
exec($sCMD, $sJunk, $iErrorLevel);
if ($iErrorLevel) {
fail("Error converting osm to osc, osmosis returned: $iErrorLevel\n");
}
}
$bUseOSMApi = isset($aResult['import-from-main-api']) && $aResult['import-from-main-api'];
$sContentURL = '';
if (isset($aResult['import-node']) && $aResult['import-node']) {
@ -116,33 +136,8 @@ if (isset($aResult['import-relation']) && $aResult['import-relation']) {
}
if ($sContentURL) {
$sModifyXMLstr = file_get_contents($sContentURL);
file_put_contents($sTemporaryFile, file_get_contents($sContentURL));
$bHaveDiff = true;
$aSpec = array(
0 => array("pipe", "r"), // stdin
1 => array("pipe", "w"), // stdout
2 => array("pipe", "w") // stderr
);
$sCMD = CONST_Osmosis_Binary.' --read-xml - --read-empty --derive-change --write-xml-change '.$sTemporaryFile;
echo $sCMD."\n";
$hProc = proc_open($sCMD, $aSpec, $aPipes);
if (!is_resource($hProc)) {
fail("Error converting osm to osc, osmosis failed\n");
}
fwrite($aPipes[0], $sModifyXMLstr);
fclose($aPipes[0]);
$sOut = stream_get_contents($aPipes[1]);
if ($aResult['verbose']) echo $sOut;
fclose($aPipes[1]);
$sErrors = stream_get_contents($aPipes[2]);
if ($aResult['verbose']) echo $sErrors;
fclose($aPipes[2]);
if ($iError = proc_close($hProc)) {
echo $sOut;
echo $sErrors;
fail("Error converting osm to osc, osmosis returned: $iError\n");
}
}
if ($bHaveDiff) {
@ -166,7 +161,7 @@ if ($aResult['deduplicate']) {
$aPartitions = chksql($oDB->getCol($sSQL));
$aPartitions[] = 0;
// we don't care about empty search_name_* artitions, they can't contain mentions of duplicates
// we don't care about empty search_name_* partitions, they can't contain mentions of duplicates
foreach ($aPartitions as $i => $sPartition) {
$sSQL = "select count(*) from search_name_".$sPartition;
$nEntries = chksql($oDB->getOne($sSQL));
@ -236,10 +231,8 @@ if ($aResult['import-osmosis'] || $aResult['import-osmosis-all']) {
fail("Error: Update interval too low for download.geofabrik.de. Please check install documentation (http://wiki.openstreetmap.org/wiki/Nominatim/Installation#Updates)\n");
}
$sImportFile = CONST_BasePath.'/data/osmosischange.osc';
$sOsmosisConfigDirectory = CONST_InstallPath.'/settings';
$sCMDDownload = CONST_Osmosis_Binary.' --read-replication-interval workingDirectory='.$sOsmosisConfigDirectory.' --simplify-change --write-xml-change '.$sImportFile;
$sCMDCheckReplicationLag = CONST_Osmosis_Binary.' -q --read-replication-lag workingDirectory='.$sOsmosisConfigDirectory;
$sImportFile = CONST_InstallPath.'/osmosischange.osc';
$sCMDDownload = CONST_Pyosmium_Get_Changes.' --server '.CONST_Replication_Url.' -o '.$sImportFile.' -s '.CONST_Replication_Max_Diff_size;
$sCMDImport = $sOsm2pgsqlCmd.' '.$sImportFile;
$sCMDIndex = CONST_InstallPath.'/nominatim/nominatim -i -d '.$aDSNInfo['database'].' -P '.$aDSNInfo['port'].' -t '.$aResult['index-instances'];
@ -247,103 +240,95 @@ if ($aResult['import-osmosis'] || $aResult['import-osmosis-all']) {
$fStartTime = time();
$iFileSize = 1001;
if (!file_exists($sImportFile)) {
// First check if there are new updates published (except for minutelies - there's always new diffs to process)
if (CONST_Replication_Update_Interval > 60) {
unset($aReplicationLag);
exec($sCMDCheckReplicationLag, $aReplicationLag, $iErrorLevel);
while ($iErrorLevel > 0 || $aReplicationLag[0] < 1) {
if ($iErrorLevel) {
echo "Error: $iErrorLevel. ";
echo "Re-trying: ".$sCMDCheckReplicationLag." in ".CONST_Replication_Recheck_Interval." secs\n";
} else {
echo ".";
}
$aLastState = chksql($oDB->getRow('SELECT * FROM import_status'));
if (!$aLastState['sequence_id']) {
echo "Updates not set up. Please run ./utils/update.php --init-updates.\n";
exit(1);
}
echo 'Currently at sequence '.$aLastState['sequence_id'].' ('.$aLastState['lastimportdate'].') - '.$aLastState['indexed']." indexed\n";
$sBatchEnd = $aLastState['lastimportdate'];
$iEndSequence = $aLastState['sequence_id'];
if ($aLastState['indexed'] == 't') {
// Sleep if the update interval has not yet been reached.
$fNextUpdate = $aLastState['lastimportdate'] + CONST_Replication_Update_Interval;
if ($fNextUpdate > $fStartTime) {
$iSleepTime = $fNextUpdate - $fStartTime;
echo "Waiting for next update for $iSleepTime sec.";
sleep($iSleepTime);
}
// Download the next batch of changes.
unlink($sImportFile);
do {
$fCMDStartTime = time();
$iNextSeq = (int) $aLastState['sequence_id'] + 1;
unset($aOutput);
echo "$sCMDDownload -I $iNextSeq\n";
exec($sCMDDownload.' -I '.$iNextSeq, $aOutput, $iResult);
if ($iResult == 3) {
echo 'No new updates. Sleeping for '.CONST_Replication_Recheck_Interval." sec.\n";
sleep(CONST_Replication_Recheck_Interval);
unset($aReplicationLag);
exec($sCMDCheckReplicationLag, $aReplicationLag, $iErrorLevel);
} else if ($iResult != 0) {
echo 'ERROR: updates failed.';
exit($iResult);
} else {
$iEndSequence = (int)$aOutput[0];
}
// There are new replication files - use osmosis to download the file
echo "\n".date('Y-m-d H:i:s')." Replication Delay is ".$aReplicationLag[0]."\n";
}
$fStartTime = time();
} while ($iResult);
// Import the file
$fCMDStartTime = time();
echo $sCMDDownload."\n";
exec($sCMDDownload, $sJunk, $iErrorLevel);
while ($iErrorLevel > 0) {
echo "Error: $iErrorLevel\n";
sleep(60);
echo 'Re-trying: '.$sCMDDownload."\n";
exec($sCMDDownload, $sJunk, $iErrorLevel);
echo $sCMDImport."\n";
unset($sJunk);
exec($sCMDImport, $sJunk, $iErrorLevel);
if ($iErrorLevel) {
echo "Error executing osm2pgsql: $iErrorLevel\n";
exit($iErrorLevel);
}
// write the update logs
$iFileSize = filesize($sImportFile);
$sBatchEnd = getosmosistimestamp($sOsmosisConfigDirectory);
$sSQL = "INSERT INTO import_osmosis_log values ('$sBatchEnd',$iFileSize,'".date('Y-m-d H:i:s', $fCMDStartTime)."','".date('Y-m-d H:i:s')."','osmosis')";
$sBatchEnd = getDatabaseDate($oDB);
$sSQL = "INSERT INTO import_osmosis_log (batchend, batchseq, batchsize, starttime, endtime, event) values ('$sBatchEnd',$iEndSequence,$iFileSize,'".date('Y-m-d H:i:s', $fCMDStartTime)."','".date('Y-m-d H:i:s')."','import')";
var_Dump($sSQL);
$oDB->query($sSQL);
echo date('Y-m-d H:i:s')." Completed osmosis step for $sBatchEnd in ".round((time()-$fCMDStartTime)/60, 2)." minutes\n";
chksql($oDB->query($sSQL));
// update the status
$sSQL = "UPDATE import_status SET lastimportdate = '$sBatchEnd', indexed=false, sequence_id = $iEndSequence";
var_Dump($sSQL);
chksql($oDB->query($sSQL));
echo date('Y-m-d H:i:s')." Completed download step for $sBatchEnd in ".round((time()-$fCMDStartTime)/60, 2)." minutes\n";
}
$iFileSize = filesize($sImportFile);
$sBatchEnd = getosmosistimestamp($sOsmosisConfigDirectory);
// Import the file
$fCMDStartTime = time();
echo $sCMDImport."\n";
exec($sCMDImport, $sJunk, $iErrorLevel);
if ($iErrorLevel) {
echo "Error: $iErrorLevel\n";
exit($iErrorLevel);
}
$sSQL = "INSERT INTO import_osmosis_log values ('$sBatchEnd',$iFileSize,'".date('Y-m-d H:i:s', $fCMDStartTime)."','".date('Y-m-d H:i:s')."','osm2pgsql')";
var_Dump($sSQL);
$oDB->query($sSQL);
echo date('Y-m-d H:i:s')." Completed osm2pgsql step for $sBatchEnd in ".round((time()-$fCMDStartTime)/60, 2)." minutes\n";
// Archive for debug?
unlink($sImportFile);
$sBatchEnd = getosmosistimestamp($sOsmosisConfigDirectory);
// Index file
$sThisIndexCmd = $sCMDIndex;
$fCMDStartTime = time();
if (!$aResult['no-index']) {
$sThisIndexCmd = $sCMDIndex;
$fCMDStartTime = time();
echo "$sThisIndexCmd\n";
exec($sThisIndexCmd, $sJunk, $iErrorLevel);
if ($iErrorLevel) {
echo "Error: $iErrorLevel\n";
exit($iErrorLevel);
}
$sSQL = "INSERT INTO import_osmosis_log (batchend, batchseq, batchsize, starttime, endtime, event) values ('$sBatchEnd',$iEndSequence,$iFileSize,'".date('Y-m-d H:i:s', $fCMDStartTime)."','".date('Y-m-d H:i:s')."','index')";
var_Dump($sSQL);
$oDB->query($sSQL);
echo date('Y-m-d H:i:s')." Completed index step for $sBatchEnd in ".round((time()-$fCMDStartTime)/60, 2)." minutes\n";
$sSQL = "update import_status set indexed = true";
$oDB->query($sSQL);
}
$sSQL = "INSERT INTO import_osmosis_log values ('$sBatchEnd',$iFileSize,'".date('Y-m-d H:i:s', $fCMDStartTime)."','".date('Y-m-d H:i:s')."','index')";
var_Dump($sSQL);
$oDB->query($sSQL);
echo date('Y-m-d H:i:s')." Completed index step for $sBatchEnd in ".round((time()-$fCMDStartTime)/60, 2)." minutes\n";
$sSQL = "update import_status set lastimportdate = '$sBatchEnd'";
$oDB->query($sSQL);
$fDuration = time() - $fStartTime;
echo date('Y-m-d H:i:s')." Completed all for $sBatchEnd in ".round($fDuration/60, 2)." minutes\n";
if (!$aResult['import-osmosis-all']) exit(0);
if (CONST_Replication_Update_Interval > 60) {
$iSleep = max(0, (strtotime($sBatchEnd)+CONST_Replication_Update_Interval-time()));
} else {
$iSleep = max(0, CONST_Replication_Update_Interval-$fDuration);
}
echo date('Y-m-d H:i:s')." Sleeping $iSleep seconds\n";
sleep($iSleep);
}
}
function getosmosistimestamp($sOsmosisConfigDirectory)
{
$sStateFile = file_get_contents($sOsmosisConfigDirectory.'/state.txt');
preg_match('#timestamp=(.+)#', $sStateFile, $aResult);
return str_replace('\:', ':', $aResult[1]);
}