unison/unison-src/dindex.u

98 lines
3.4 KiB
Plaintext

-- A distributed index, using Highest Random Weight (HRW) hashing
-- to pick which nodes are responsible for which keys. See:
-- https://en.wikipedia.org/wiki/Rendezvous_hashing
DIndex.Replication-Factor = 3;
DIndex.Timeout = Duration.seconds 10;
DIndex.Max-Timeout = Duration.seconds 500;
DIndex.empty : ∀ k v . Remote (Index Node (Index k v));
DIndex.empty = Index.empty;
-- Pick the nodes responsible for a key, using HRW hashing
DIndex.nodesForKey : ∀ k v . k -> Index Node (Index k v) -> Remote (Vector Node);
DIndex.nodesForKey k ind = do Remote
nodes := Index.keys ind;
hashes := Remote.traverse (node -> hash! (node, k)) nodes;
(nodes `Vector.zip` hashes)
|> Vector.sort Hash.Order 2nd
|> Vector.take DIndex.Replication-Factor
|> Vector.map 1st
|> pure;;
;
DIndex.lookup : ∀ k v . k -> Index Node (Index k v) -> Remote (Optional v);
DIndex.lookup k ind = do Remote
nodes := DIndex.nodesForKey k ind;
localLookup = node -> (do Remote
nind := Index.lookup node ind;
-- on slim chance that a Node is removed from the cluster just before
-- we do the lookup, it gets treated like a timeout
Optional.fold (Remote.map (const None) (Remote.delay DIndex.Timeout))
(Index.lookup k)
nind;;)
;
-- todo: use Remote.quorum here
-- Remote.race DIndex.Timeout <| Vector.map localLookup nodes;;
rs := Remote.traverse localLookup nodes;
pure (Vector.at 0 rs |> Optional.bind identity);;
;
DIndex.insert : ∀ k v . k -> v -> Index Node (Index k v) -> Remote Unit;
DIndex.insert k v ind = do Remote
nodes := DIndex.nodesForKey k ind;
localInsert = node -> (do Remote
nind := Index.lookup node ind;
Optional.fold (Remote.map (const Unit) (Remote.delay DIndex.Timeout))
(Index.insert k v)
nind;;)
;
Remote.race DIndex.Timeout <| Vector.map localInsert nodes;;
;
DIndex.join : ∀ k v . Index Node (Index k v) -> Remote Unit;
DIndex.join ind = do Remote
here := Remote.here;
localInd := Index.empty;
Index.insert here localInd ind;;
;
DIndex.indicesForKey : ∀ k v . k -> Index Node (Index k v) -> Remote (Vector (Index k v));
DIndex.indicesForKey k ind = do Remote
nodes := DIndex.nodesForKey k ind;
indices := Remote.traverse (node -> Index.lookup node ind) nodes;
pure (Optional.somes indices);;
;
DIndex.rebalance : ∀ k v . k -> Index Node (Index k v) -> Remote Unit;
DIndex.rebalance k ind = do Remote
indices := DIndex.indicesForKey k ind;
t = DIndex.Timeout;
results := Remote.parallel-traverse DIndex.Max-Timeout (Index.lookup k `then` Remote.timeout t) indices;
resultsHashes := Remote.traverse hash! results;
uh := hash! None;
hd = uh `Optional.getOr` Vector.at 0 resultsHashes;
eq = h1 h2 -> Hash.equal (Hash.erase h1) (Hash.erase h2);
if (Vector.all? (eq hd) resultsHashes)
-- all results matched, we're good
(pure Unit)
-- not all results matched, reinsert
(do Remote
ov := DIndex.lookup k ind;
Optional.fold (pure Unit)
(v -> DIndex.insert k v ind)
ov;;)
;;
;
DIndex.leave : ∀ k v . Node -> Index Node (Index k v) -> Remote Unit;
DIndex.leave node ind = do Remote
local-ind := Index.lookup node ind;
Index.delete node ind;
Optional.fold
(pure Unit)
(local-ind -> do Remote
keys := Index.keys local-ind;
Remote.fork <| Remote.traverse (k -> DIndex.rebalance k ind) keys;;)
local-ind;;
;