fix SQL and some other stuff

This commit is contained in:
Sarah Hoffmann 2020-01-20 00:05:28 +01:00
parent 6c0d6d3178
commit 4a9502bf88

View File

@ -96,18 +96,16 @@ class Indexer(object):
self.conn = make_connection(options)
self.threads = []
self.poll = select.poll()
for i in range(options.threads):
t = IndexingThread(i, options)
self.threads.append(t)
self.poll.register(t, select.EPOLLIN)
def run(self):
log.info("Starting indexing rank ({} to {}) using {} threads".format(
self.options.minrank, self.options.maxrank,
self.options.threads))
for rank in range(self.options.minrank, 30):
for rank in range(self.options.minrank, min(self.options.maxrank, 30)):
self.index(RankRunner(rank))
if self.options.maxrank >= 30:
@ -117,7 +115,7 @@ class Indexer(object):
def index(self, obj):
log.info("Starting {}".format(obj.name()))
cur = self.conn.cursor(name="main")
cur = self.conn.cursor(name='main')
cur.execute(obj.sql_index_sectors())
total_tuples = 0
@ -130,25 +128,29 @@ class Indexer(object):
next_thread = self.find_free_thread()
done_tuples = 0
rank_start_time = datetime.now()
sector_sql = obj.sql_sector_places()
index_sql = obj.sql_index_place()
min_grouped_tuples = total_tuples - len(self.threads) * 1000
for r in cur:
sector = r[0]
# Should we do the remaining ones together?
do_all = total_tuples - done_tuples < len(self.threads) * 1000
do_all = done_tuples > min_grouped_tuples
pcur = self.conn.cursor(name='places')
if do_all:
pcur.execute(obj.sql_nosector_places())
else:
pcur.execute(obj.sql_sector_places(), (sector, ))
pcur.execute(sector_sql, (sector, ))
for place in pcur:
place_id = place[0]
log.debug("Processing place {}".format(place_id))
thread = next(next_thread)
thread.perform(obj.sql_index_place(), (place_id,))
thread.perform(index_sql, (place_id,))
done_tuples += 1
pcur.close()
@ -164,24 +166,19 @@ class Indexer(object):
rank_end_time = datetime.now()
diff_seconds = (rank_end_time-rank_start_time).total_seconds()
log.info("Done {} in {} @ {} per second - FINISHED {}\n".format(
done_tuples, int(diff_seconds),
log.info("Done {}/{} in {} @ {} per second - FINISHED {}\n".format(
done_tuples, total_tuples, int(diff_seconds),
done_tuples/diff_seconds, obj.name()))
def find_free_thread(self):
thread_lookup = { t.fileno() : t for t in self.threads}
done_fids = [ t.fileno() for t in self.threads ]
ready = self.threads
while True:
for fid in done_fids:
thread = thread_lookup[fid]
for thread in ready:
if thread.is_done():
yield thread
else:
print("not good", fid)
done_fids = [ x[0] for x in self.poll.poll()]
ready, _, _ = select.select(self.threads, [], [])
assert(False, "Unreachable code")
@ -212,8 +209,8 @@ class RankRunner(object):
def sql_sector_places(self):
return """SELECT place_id FROM placex
WHERE indexed_status > 0 and geometry_sector = %s
ORDER BY geometry_sector"""
WHERE indexed_status > 0 and rank_search = {}
and geometry_sector = %s""".format(self.rank)
def sql_index_place(self):
return "EXECUTE rnk_index(%s)"