2010-10-24 03:12:37 +04:00
/*
*/
# include <stdio.h>
# include <unistd.h>
# include <stdlib.h>
# include <string.h>
# include <assert.h>
# include <pthread.h>
# include <time.h>
# include <stdint.h>
# include <libpq-fe.h>
# include "nominatim.h"
# include "index.h"
# include "export.h"
# include "postgresql.h"
extern int verbose ;
void nominatim_index ( int rank_min , int rank_max , int num_threads , const char * conninfo , const char * structuredoutputfile )
{
2010-12-10 19:13:07 +03:00
struct index_thread_data * thread_data ;
pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER ;
int tuples , count , sleepcount ;
time_t rankStartTime ;
int rankTotalTuples ;
int rankCountTuples ;
float rankPerSecond ;
PGconn * conn ;
PGresult * res ;
PGresult * resSectors ;
PGresult * resPlaces ;
PGresult * resNULL ;
int rank ;
int i ;
int iSector ;
int iResult ;
2010-10-24 03:12:37 +04:00
const char * paramValues [ 2 ] ;
int paramLengths [ 2 ] ;
int paramFormats [ 2 ] ;
uint32_t paramRank ;
uint32_t paramSector ;
uint32_t sector ;
xmlTextWriterPtr writer ;
2010-12-10 19:13:07 +03:00
pthread_mutex_t writer_mutex = PTHREAD_MUTEX_INITIALIZER ;
2010-10-24 03:12:37 +04:00
Oid pg_prepare_params [ 2 ] ;
2010-10-26 19:22:41 +04:00
conn = PQconnectdb ( conninfo ) ;
2010-12-10 19:13:07 +03:00
if ( PQstatus ( conn ) ! = CONNECTION_OK )
{
2010-10-24 03:12:37 +04:00
fprintf ( stderr , " Connection to database failed: %s \n " , PQerrorMessage ( conn ) ) ;
exit ( EXIT_FAILURE ) ;
}
pg_prepare_params [ 0 ] = PG_OID_INT4 ;
res = PQprepare ( conn , " index_sectors " ,
2010-12-10 19:13:07 +03:00
" select geometry_sector,count(*) from placex where rank_search = $1 and indexed_status > 0 group by geometry_sector order by geometry_sector " ,
1 , pg_prepare_params ) ;
2010-10-26 19:22:41 +04:00
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
{
fprintf ( stderr , " Failed preparing index_sectors: %s \n " , PQerrorMessage ( conn ) ) ;
exit ( EXIT_FAILURE ) ;
}
2010-10-24 03:12:37 +04:00
PQclear ( res ) ;
2010-11-02 14:37:11 +03:00
pg_prepare_params [ 0 ] = PG_OID_INT4 ;
res = PQprepare ( conn , " index_nosectors " ,
2010-12-10 19:13:07 +03:00
" select 0::integer,count(*) from placex where rank_search = $1 and indexed_status > 0 " ,
1 , pg_prepare_params ) ;
2010-11-02 14:37:11 +03:00
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
{
fprintf ( stderr , " Failed preparing index_sectors: %s \n " , PQerrorMessage ( conn ) ) ;
exit ( EXIT_FAILURE ) ;
}
PQclear ( res ) ;
2010-10-24 03:12:37 +04:00
pg_prepare_params [ 0 ] = PG_OID_INT4 ;
pg_prepare_params [ 1 ] = PG_OID_INT4 ;
res = PQprepare ( conn , " index_sector_places " ,
2010-12-10 19:13:07 +03:00
" select place_id from placex where rank_search = $1 and geometry_sector = $2 and indexed_status > 0 " ,
2 , pg_prepare_params ) ;
2010-10-26 19:22:41 +04:00
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
{
fprintf ( stderr , " Failed preparing index_sector_places: %s \n " , PQerrorMessage ( conn ) ) ;
exit ( EXIT_FAILURE ) ;
}
2010-10-24 03:12:37 +04:00
PQclear ( res ) ;
2010-11-02 14:37:11 +03:00
pg_prepare_params [ 0 ] = PG_OID_INT4 ;
res = PQprepare ( conn , " index_nosector_places " ,
2010-12-10 19:13:07 +03:00
" select place_id from placex where rank_search = $1 and indexed_status > 0 order by geometry_sector " ,
1 , pg_prepare_params ) ;
2010-11-02 14:37:11 +03:00
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
{
fprintf ( stderr , " Failed preparing index_nosector_places: %s \n " , PQerrorMessage ( conn ) ) ;
exit ( EXIT_FAILURE ) ;
}
PQclear ( res ) ;
2010-10-24 03:12:37 +04:00
// Build the data for each thread
thread_data = ( struct index_thread_data * ) malloc ( sizeof ( struct index_thread_data ) * num_threads ) ;
2010-12-10 19:13:07 +03:00
for ( i = 0 ; i < num_threads ; i + + )
{
thread_data [ i ] . conn = PQconnectdb ( conninfo ) ;
if ( PQstatus ( thread_data [ i ] . conn ) ! = CONNECTION_OK )
{
fprintf ( stderr , " Connection to database failed: %s \n " , PQerrorMessage ( thread_data [ i ] . conn ) ) ;
exit ( EXIT_FAILURE ) ;
}
2011-06-14 17:42:46 +04:00
pg_prepare_params [ 0 ] = PG_OID_INT8 ;
2010-12-10 19:13:07 +03:00
res = PQprepare ( thread_data [ i ] . conn , " index_placex " ,
" update placex set indexed_status = 0 where place_id = $1 " ,
1 , pg_prepare_params ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
{
fprintf ( stderr , " Failed preparing index_placex: %s \n " , PQerrorMessage ( conn ) ) ;
exit ( EXIT_FAILURE ) ;
}
PQclear ( res ) ;
2012-01-21 14:21:42 +04:00
/*res = PQexec(thread_data[i].conn, "set enable_seqscan = false");
2010-12-10 19:13:07 +03:00
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
{
fprintf ( stderr , " Failed disabling sequential scan: %s \n " , PQerrorMessage ( conn ) ) ;
exit ( EXIT_FAILURE ) ;
}
2012-01-21 14:21:42 +04:00
PQclear ( res ) ; */
2010-12-10 19:13:07 +03:00
nominatim_exportCreatePreparedQueries ( thread_data [ i ] . conn ) ;
}
// Create the output file
writer = NULL ;
if ( structuredoutputfile )
{
writer = nominatim_exportXMLStart ( structuredoutputfile ) ;
}
2010-10-24 03:12:37 +04:00
2012-05-24 02:26:16 +04:00
fprintf ( stderr , " Starting indexing rank (%i to %i) using %i threads \n " , rank_min , rank_max , num_threads ) ;
2010-10-24 03:12:37 +04:00
for ( rank = rank_min ; rank < = rank_max ; rank + + )
{
2011-02-21 15:41:44 +03:00
fprintf ( stderr , " Starting rank %d \n " , rank ) ;
2010-12-10 19:13:07 +03:00
rankCountTuples = 0 ;
rankPerSecond = 0 ;
2010-10-24 03:12:37 +04:00
paramRank = PGint32 ( rank ) ;
paramValues [ 0 ] = ( char * ) & paramRank ;
paramLengths [ 0 ] = sizeof ( paramRank ) ;
paramFormats [ 0 ] = 1 ;
2011-01-21 13:40:44 +03:00
// if (rank < 16)
// resSectors = PQexecPrepared(conn, "index_nosectors", 1, paramValues, paramLengths, paramFormats, 1);
// else
2011-02-07 14:13:18 +03:00
resSectors = PQexecPrepared ( conn , " index_sectors " , 1 , paramValues , paramLengths , paramFormats , 1 ) ;
2010-10-24 03:12:37 +04:00
if ( PQresultStatus ( resSectors ) ! = PGRES_TUPLES_OK )
{
fprintf ( stderr , " index_sectors: SELECT failed: %s " , PQerrorMessage ( conn ) ) ;
PQclear ( resSectors ) ;
exit ( EXIT_FAILURE ) ;
}
2010-12-10 19:13:07 +03:00
if ( PQftype ( resSectors , 0 ) ! = PG_OID_INT4 )
{
2010-10-24 03:12:37 +04:00
fprintf ( stderr , " Sector value has unexpected type \n " ) ;
PQclear ( resSectors ) ;
exit ( EXIT_FAILURE ) ;
2010-12-10 19:13:07 +03:00
}
if ( PQftype ( resSectors , 1 ) ! = PG_OID_INT8 )
{
2010-10-24 03:12:37 +04:00
fprintf ( stderr , " Sector value has unexpected type \n " ) ;
PQclear ( resSectors ) ;
exit ( EXIT_FAILURE ) ;
2010-12-10 19:13:07 +03:00
}
rankTotalTuples = 0 ;
for ( iSector = 0 ; iSector < PQntuples ( resSectors ) ; iSector + + )
{
rankTotalTuples + = PGint64 ( * ( ( uint64_t * ) PQgetvalue ( resSectors , iSector , 1 ) ) ) ;
}
rankStartTime = time ( 0 ) ;
for ( iSector = 0 ; iSector < = PQntuples ( resSectors ) ; iSector + + )
{
if ( iSector > 0 )
{
resPlaces = PQgetResult ( conn ) ;
if ( PQresultStatus ( resPlaces ) ! = PGRES_TUPLES_OK )
{
fprintf ( stderr , " index_sector_places: SELECT failed: %s " , PQerrorMessage ( conn ) ) ;
PQclear ( resPlaces ) ;
exit ( EXIT_FAILURE ) ;
}
2011-06-14 17:42:46 +04:00
if ( PQftype ( resPlaces , 0 ) ! = PG_OID_INT8 )
2010-12-10 19:13:07 +03:00
{
fprintf ( stderr , " Place_id value has unexpected type \n " ) ;
PQclear ( resPlaces ) ;
exit ( EXIT_FAILURE ) ;
}
resNULL = PQgetResult ( conn ) ;
if ( resNULL ! = NULL )
{
fprintf ( stderr , " Unexpected non-null response \n " ) ;
exit ( EXIT_FAILURE ) ;
}
}
if ( iSector < PQntuples ( resSectors ) )
{
sector = PGint32 ( * ( ( uint32_t * ) PQgetvalue ( resSectors , iSector , 0 ) ) ) ;
2011-02-21 15:41:44 +03:00
// fprintf(stderr, "\n Starting sector %d size %ld\n", sector, PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1))));
2010-12-10 19:13:07 +03:00
// Get all the place_id's for this sector
paramRank = PGint32 ( rank ) ;
paramValues [ 0 ] = ( char * ) & paramRank ;
paramLengths [ 0 ] = sizeof ( paramRank ) ;
paramFormats [ 0 ] = 1 ;
paramSector = PGint32 ( sector ) ;
paramValues [ 1 ] = ( char * ) & paramSector ;
paramLengths [ 1 ] = sizeof ( paramSector ) ;
paramFormats [ 1 ] = 1 ;
2011-02-07 14:13:18 +03:00
if ( rankTotalTuples - rankCountTuples < num_threads * 1000 )
2011-01-21 13:40:44 +03:00
{
2011-02-07 14:13:18 +03:00
iResult = PQsendQueryPrepared ( conn , " index_nosector_places " , 1 , paramValues , paramLengths , paramFormats , 1 ) ;
2011-01-21 13:40:44 +03:00
}
2010-12-10 19:13:07 +03:00
else
2011-01-21 13:40:44 +03:00
{
2010-12-10 19:13:07 +03:00
iResult = PQsendQueryPrepared ( conn , " index_sector_places " , 2 , paramValues , paramLengths , paramFormats , 1 ) ;
2011-01-21 13:40:44 +03:00
}
2010-12-10 19:13:07 +03:00
if ( ! iResult )
{
fprintf ( stderr , " index_sector_places: SELECT failed: %s " , PQerrorMessage ( conn ) ) ;
PQclear ( resPlaces ) ;
exit ( EXIT_FAILURE ) ;
}
}
if ( iSector > 0 )
{
count = 0 ;
rankPerSecond = 0 ;
tuples = PQntuples ( resPlaces ) ;
if ( tuples > 0 )
{
// Spawn threads
for ( i = 0 ; i < num_threads ; i + + )
{
thread_data [ i ] . res = resPlaces ;
thread_data [ i ] . tuples = tuples ;
thread_data [ i ] . count = & count ;
thread_data [ i ] . count_mutex = & count_mutex ;
thread_data [ i ] . writer = writer ;
thread_data [ i ] . writer_mutex = & writer_mutex ;
pthread_create ( & thread_data [ i ] . thread , NULL , & nominatim_indexThread , ( void * ) & thread_data [ i ] ) ;
}
// Monitor threads to give user feedback
sleepcount = 0 ;
while ( count < tuples )
{
usleep ( 1000 ) ;
// Aim for one update per second
2012-03-22 04:33:28 +04:00
if ( sleepcount + + > 500 )
2010-12-10 19:13:07 +03:00
{
rankPerSecond = ( ( float ) rankCountTuples + ( float ) count ) / MAX ( difftime ( time ( 0 ) , rankStartTime ) , 1 ) ;
2011-02-21 15:41:44 +03:00
fprintf ( stderr , " Done %i in %i @ %f per second - Rank %i ETA (seconds): %f \n " , ( rankCountTuples + count ) , ( int ) ( difftime ( time ( 0 ) , rankStartTime ) ) , rankPerSecond , rank , ( ( float ) ( rankTotalTuples - ( rankCountTuples + count ) ) ) / rankPerSecond ) ;
2010-12-10 19:13:07 +03:00
sleepcount = 0 ;
}
}
// Wait for everything to finish
for ( i = 0 ; i < num_threads ; i + + )
{
pthread_join ( thread_data [ i ] . thread , NULL ) ;
}
rankCountTuples + = tuples ;
}
// Finished sector
rankPerSecond = ( float ) rankCountTuples / MAX ( difftime ( time ( 0 ) , rankStartTime ) , 1 ) ;
2011-02-21 15:41:44 +03:00
fprintf ( stderr , " Done %i in %i @ %f per second - ETA (seconds): %f \n " , rankCountTuples , ( int ) ( difftime ( time ( 0 ) , rankStartTime ) ) , rankPerSecond , ( ( float ) ( rankTotalTuples - rankCountTuples ) ) / rankPerSecond ) ;
2010-12-10 19:13:07 +03:00
PQclear ( resPlaces ) ;
}
2011-02-07 14:13:18 +03:00
if ( rankTotalTuples - rankCountTuples < num_threads * 20 & & iSector < PQntuples ( resSectors ) )
{
iSector = PQntuples ( resSectors ) - 1 ;
}
2010-12-10 19:13:07 +03:00
}
2010-10-24 03:12:37 +04:00
// Finished rank
2011-02-21 15:41:44 +03:00
fprintf ( stderr , " \r Done %i in %i @ %f per second - FINISHED \n \n " , rankCountTuples , ( int ) ( difftime ( time ( 0 ) , rankStartTime ) ) , rankPerSecond ) ;
2010-10-24 03:12:37 +04:00
PQclear ( resSectors ) ;
}
if ( writer )
{
2010-12-10 19:13:07 +03:00
nominatim_exportXMLEnd ( writer ) ;
2010-10-24 03:12:37 +04:00
}
}
void * nominatim_indexThread ( void * thread_data_in )
{
2010-12-10 19:13:07 +03:00
struct index_thread_data * thread_data = ( struct index_thread_data * ) thread_data_in ;
2011-02-07 14:13:18 +03:00
struct export_data querySet ;
2010-10-24 03:12:37 +04:00
2010-12-10 19:13:07 +03:00
PGresult * res ;
2010-10-24 03:12:37 +04:00
const char * paramValues [ 1 ] ;
int paramLengths [ 1 ] ;
int paramFormats [ 1 ] ;
2011-06-14 17:42:46 +04:00
uint64_t paramPlaceID ;
uint64_t place_id ;
2010-12-10 19:13:07 +03:00
time_t updateStartTime ;
while ( 1 )
{
pthread_mutex_lock ( thread_data - > count_mutex ) ;
if ( * ( thread_data - > count ) > = thread_data - > tuples )
{
pthread_mutex_unlock ( thread_data - > count_mutex ) ;
break ;
}
2011-06-14 17:42:46 +04:00
place_id = PGint64 ( * ( ( uint64_t * ) PQgetvalue ( thread_data - > res , * thread_data - > count , 0 ) ) ) ;
2010-12-10 19:13:07 +03:00
( * thread_data - > count ) + + ;
pthread_mutex_unlock ( thread_data - > count_mutex ) ;
2011-06-14 17:42:46 +04:00
if ( verbose ) fprintf ( stderr , " Processing place_id %ld \n " , place_id ) ;
2010-12-10 19:13:07 +03:00
updateStartTime = time ( 0 ) ;
2011-01-21 13:40:44 +03:00
int done = 0 ;
2011-02-08 15:09:11 +03:00
if ( thread_data - > writer )
{
nominatim_exportPlaceQueries ( place_id , thread_data - > conn , & querySet ) ;
}
2011-01-21 13:40:44 +03:00
while ( ! done )
{
2011-06-14 17:42:46 +04:00
paramPlaceID = PGint64 ( place_id ) ;
2011-01-21 13:40:44 +03:00
paramValues [ 0 ] = ( char * ) & paramPlaceID ;
paramLengths [ 0 ] = sizeof ( paramPlaceID ) ;
paramFormats [ 0 ] = 1 ;
res = PQexecPrepared ( thread_data - > conn , " index_placex " , 1 , paramValues , paramLengths , paramFormats , 1 ) ;
if ( PQresultStatus ( res ) = = PGRES_COMMAND_OK )
done = 1 ;
else
{
2012-01-29 03:33:28 +04:00
if ( ! strncmp ( PQerrorMessage ( thread_data - > conn ) , " ERROR: deadlock detected " , 25 ) )
2011-01-21 13:40:44 +03:00
{
2011-06-14 17:42:46 +04:00
fprintf ( stderr , " index_placex: UPDATE failed - deadlock, retrying (%ld) \n " , place_id ) ;
2011-02-08 15:09:11 +03:00
PQclear ( res ) ;
sleep ( rand ( ) % 10 ) ;
2011-01-21 13:40:44 +03:00
}
else
{
fprintf ( stderr , " index_placex: UPDATE failed: %s " , PQerrorMessage ( thread_data - > conn ) ) ;
PQclear ( res ) ;
2012-07-31 02:31:38 +04:00
exit ( EXIT_FAILURE ) ;
2011-01-21 13:40:44 +03:00
}
}
2010-10-24 03:12:37 +04:00
}
PQclear ( res ) ;
2011-06-14 17:42:46 +04:00
if ( difftime ( time ( 0 ) , updateStartTime ) > 1 ) fprintf ( stderr , " Slow place_id %ld \n " , place_id ) ;
2010-10-24 03:12:37 +04:00
if ( thread_data - > writer )
{
2011-02-07 14:13:18 +03:00
nominatim_exportPlace ( place_id , thread_data - > conn , thread_data - > writer , thread_data - > writer_mutex , & querySet ) ;
nominatim_exportFreeQueries ( & querySet ) ;
2010-10-24 03:12:37 +04:00
}
2010-12-10 19:13:07 +03:00
}
2010-10-24 03:12:37 +04:00
2010-12-10 19:13:07 +03:00
return NULL ;
2010-10-24 03:12:37 +04:00
}