Checkpoint select sparse tables.

This commit is contained in:
fchirica 2021-01-07 00:20:15 +02:00 committed by Gene Hoffman
parent 19a61890b4
commit 0e55ab6719
3 changed files with 73 additions and 17 deletions

View File

@ -169,6 +169,8 @@ class AddressManager:
map_info: Dict[int, ExtendedPeerInfo]
last_good: int
tried_collisions: List[int]
used_new_matrix_positions: Set[Tuple[int, int]]
used_tried_matrix_positions: Set[Tuple[int, int]]
def __init__(self):
self.clear()
@ -186,6 +188,40 @@ class AddressManager:
self.map_info = {}
self.last_good = 1
self.tried_collisions = []
self.used_new_matrix_positions = set()
self.used_tried_matrix_positions = set()
# Use only this method for modifying new matrix.
def _set_new_matrix(self, row: int, col: int, value: int):
self.new_matrix[row][col] = value
if value == -1:
if (row, col) in self.used_new_matrix_positions:
self.used_new_matrix_positions.remove((row, col))
else:
if (row, col) not in self.used_new_matrix_positions:
self.used_new_matrix_positions.add((row, col))
# Use only this method for modifying tried matrix.
def _set_tried_matrix(self, row: int, col: int, value: int):
self.tried_matrix[row][col] = value
if value == -1:
if (row, col) in self.used_tried_matrix_positions:
self.used_tried_matrix_positions.remove((row, col))
else:
if (row, col) not in self.used_tried_matrix_positions:
self.used_tried_matrix_positions.add((row, col))
def load_used_table_positions(self):
self.used_new_matrix_positions = set()
self.used_tried_matrix_positions = set()
for bucket in range(NEW_BUCKET_COUNT):
for pos in range(BUCKET_SIZE):
if self.new_matrix[bucket][pos] != -1:
self.used_new_matrix_positions.add((bucket, pos))
for bucket in range(TRIED_BUCKET_COUNT):
for pos in range(BUCKET_SIZE):
if self.tried_matrix[bucket][pos] != -1:
self.used_tried_matrix_positions.add((bucket, pos))
def create_(self, addr: TimestampedPeerInfo, addr_src: Optional[PeerInfo]) -> Tuple[ExtendedPeerInfo, int]:
self.id_count += 1
@ -219,7 +255,7 @@ class AddressManager:
for bucket in range(NEW_BUCKET_COUNT):
pos = info.get_bucket_position(self.key, True, bucket)
if self.new_matrix[bucket][pos] == node_id:
self.new_matrix[bucket][pos] = -1
self._set_new_matrix(bucket, pos, -1)
info.ref_count -= 1
assert info.ref_count == 0
self.new_count -= 1
@ -231,16 +267,16 @@ class AddressManager:
assert node_id_evict in self.map_info
old_info = self.map_info[node_id_evict]
old_info.is_tried = False
self.tried_matrix[cur_bucket][cur_bucket_pos] = -1
self._set_tried_matrix(cur_bucket, cur_bucket_pos, -1)
self.tried_count -= 1
# Find its position into new table.
new_bucket = old_info.get_new_bucket(self.key)
new_bucket_pos = old_info.get_bucket_position(self.key, True, new_bucket)
self.clear_new_(new_bucket, new_bucket_pos)
old_info.ref_count = 1
self.new_matrix[new_bucket][new_bucket_pos] = node_id_evict
self._set_new_matrix(new_bucket, new_bucket_pos, node_id_evict)
self.new_count += 1
self.tried_matrix[cur_bucket][cur_bucket_pos] = node_id
self._set_tried_matrix(cur_bucket, cur_bucket_pos, node_id)
self.tried_count += 1
info.is_tried = True
@ -250,7 +286,7 @@ class AddressManager:
delete_info = self.map_info[delete_id]
assert delete_info.ref_count > 0
delete_info.ref_count -= 1
self.new_matrix[bucket][pos] = -1
self._set_new_matrix(bucket, pos, -1)
if delete_info.ref_count == 0:
self.delete_new_entry_(delete_id)
@ -371,7 +407,7 @@ class AddressManager:
self.clear_new_(new_bucket, new_bucket_pos)
info.ref_count += 1
if node_id is not None:
self.new_matrix[new_bucket][new_bucket_pos] = node_id
self._set_new_matrix(new_bucket, new_bucket_pos, node_id)
else:
if info.ref_count == 0:
if node_id is not None:
@ -401,28 +437,47 @@ class AddressManager:
# Use a 50% chance for choosing between tried and new table entries.
if not new_only and self.tried_count > 0 and (self.new_count == 0 or randrange(2) == 0):
chance = 1.0
start = time.time()
if len(self.used_tried_matrix_positions) < math.sqrt(TRIED_BUCKET_COUNT * BUCKET_SIZE):
cached_tried_matrix_positions = list(self.used_tried_matrix_positions)
while True:
tried_bucket = randrange(TRIED_BUCKET_COUNT)
tried_buket_pos = randrange(BUCKET_SIZE)
while self.tried_matrix[tried_bucket][tried_buket_pos] == -1:
tried_bucket = (tried_bucket + randbits(LOG_TRIED_BUCKET_COUNT)) % TRIED_BUCKET_COUNT
tried_buket_pos = (tried_buket_pos + randbits(LOG_BUCKET_SIZE)) % BUCKET_SIZE
if len(self.used_tried_matrix_positions) < math.sqrt(TRIED_BUCKET_COUNT * BUCKET_SIZE):
# The table is sparse, randomly pick from positions list.
index = randrange(len(cached_tried_matrix_positions))
tried_bucket, tried_bucket_pos = cached_tried_matrix_positions[index]
else:
# The table is dense, randomly trying positions is faster than loading positions list.
tried_bucket = randrange(TRIED_BUCKET_COUNT)
tried_buket_pos = randrange(BUCKET_SIZE)
while self.tried_matrix[tried_bucket][tried_buket_pos] == -1:
tried_bucket = (tried_bucket + randbits(LOG_TRIED_BUCKET_COUNT)) % TRIED_BUCKET_COUNT
tried_buket_pos = (tried_buket_pos + randbits(LOG_BUCKET_SIZE)) % BUCKET_SIZE
node_id = self.tried_matrix[tried_bucket][tried_buket_pos]
assert node_id != -1
info = self.map_info[node_id]
if randbits(30) < (chance * info.get_selection_chance() * (1 << 30)):
end = time.time()
log.info(f"address_manager.select_peer took {end - start} seconds in tried table.")
return info
chance *= 1.2
else:
chance = 1.0
start = time.time()
if len(self.used_new_matrix_positions) < math.sqrt(NEW_BUCKET_COUNT * BUCKET_SIZE):
cached_new_matrix_positions = list(used_new_matrix_positions)
while True:
new_bucket = randrange(NEW_BUCKET_COUNT)
new_bucket_pos = randrange(BUCKET_SIZE)
while self.new_matrix[new_bucket][new_bucket_pos] == -1:
new_bucket = (new_bucket + randbits(LOG_NEW_BUCKET_COUNT)) % NEW_BUCKET_COUNT
new_bucket_pos = (new_bucket_pos + randbits(LOG_BUCKET_SIZE)) % BUCKET_SIZE
if len(self.used_new_matrix_positions) < math.sqrt(NEW_BUCKET_COUNT * BUCKET_SIZE):
index = randrange(len(cached_new_matrix_positions))
new_bucket, new_bucket_pos = cached_new_matrix_positions[index]
else:
new_bucket = randrange(NEW_BUCKET_COUNT)
new_bucket_pos = randrange(BUCKET_SIZE)
while self.new_matrix[new_bucket][new_bucket_pos] == -1:
new_bucket = (new_bucket + randbits(LOG_NEW_BUCKET_COUNT)) % NEW_BUCKET_COUNT
new_bucket_pos = (new_bucket_pos + randbits(LOG_BUCKET_SIZE)) % BUCKET_SIZE
node_id = self.new_matrix[new_bucket][new_bucket_pos]
assert node_id != -1
info = self.map_info[node_id]
if randbits(30) < chance * info.get_selection_chance() * (1 << 30):
end = time.time()

View File

@ -196,4 +196,5 @@ class AddressManagerStore:
for node_id, info in list(address_manager.map_info.items()):
if not info.is_tried and info.ref_count == 0:
address_manager.delete_new_entry_(node_id)
address_manager.load_used_table_positions()
return address_manager

View File

@ -103,7 +103,7 @@ class FullNodeDiscovery:
self.relay_queue.put_nowait((timestamped_peer_info, 1))
# Updates timestamps each time we receive a message for outbound connections.
async def update_peer_timestamp_on_message(self, peer: WSChiaConnection):
async def update_peer_timestamp_on_message(self, peer: ws.WSChiaConnection):
if (
peer.is_outbound
and peer.peer_server_port is not None