-- A distributed index, using Highest Random Weight (HRW) hashing
-- to pick which nodes are responsible for which keys. See:
-- https://en.wikipedia.org/wiki/Rendezvous_hashing
DIndex.Replication-Factor = 1;
DIndex.Timeout = Duration.seconds 10;
DIndex.Max-Timeout = Duration.seconds 500;
alias DIndex k v = Index Node (Index k v);
DIndex.empty : ∀ k v . Remote (DIndex k v);
DIndex.empty = Index.empty;
-- Pick the nodes responsible for a key, using HRW hashing
DIndex.nodesForKey : ∀ k v . k -> DIndex k v -> Remote (Vector Node);
DIndex.nodesForKey k ind = do Remote
nodes := Index.keys ind;
hashes := Remote.traverse (node -> hash! (node, k)) nodes;
(nodes `Vector.zip` hashes)
|> Vector.sort-by Hash.Order 2nd
|> Vector.take DIndex.Replication-Factor
|> Vector.map 1st
|> pure;;
DIndex.lookup : ∀ k v . k -> DIndex k v -> Remote (Optional v);
DIndex.lookup k ind = do Remote
nodes := DIndex.nodesForKey k ind;
localLookup = node -> (do Remote
nind := Index.lookup node ind;
-- on slim chance that a Node is removed from the cluster just before
-- we do the lookup, it gets treated like a timeout
Optional.fold (Remote.map (const None) (Remote.sleep DIndex.Timeout))
(Index.lookup k)
-- todo: use Remote.quorum here
Remote.race DIndex.Timeout <| Vector.map localLookup nodes;;
DIndex.insert : ∀ k v . k -> v -> DIndex k v -> Remote Unit;
DIndex.insert k v ind = do Remote
nodes := DIndex.nodesForKey k ind;
localInsert = node -> (do Remote
nind := Index.lookup node ind;
Optional.fold (Remote.map (const Unit) (Remote.sleep DIndex.Timeout))
(Index.insert k v)
Remote.race DIndex.Timeout <| Vector.map localInsert nodes;;
DIndex.join : ∀ k v . DIndex k v -> Remote Unit;
DIndex.join ind = do Remote
here := Remote.here;
localInd := Index.empty;
Index.insert here localInd ind;;
DIndex.indicesForKey : ∀ k v . k -> DIndex k v -> Remote (Vector (Index k v));
DIndex.indicesForKey k ind = do Remote
nodes := DIndex.nodesForKey k ind;
indices := Remote.traverse (node -> Index.lookup node ind) nodes;
pure (Optional.somes indices);;
DIndex.rebalance : ∀ k v . k -> DIndex k v -> Remote Unit;
DIndex.rebalance k ind = do Remote
indices := DIndex.indicesForKey k ind;
t = DIndex.Timeout;
results := Remote.parallel-traverse DIndex.Max-Timeout (Index.lookup k `and-then` Remote.timeout t) indices;
resultsHashes := Remote.traverse hash! results;
uh := hash! None;
hd = uh `Optional.get-or` Vector.at 0 resultsHashes;
eq = h1 h2 -> Hash.erase h1 ==_Hash Hash.erase h2;
if Vector.all? (eq hd) resultsHashes
-- all results matched, we're good
then pure Unit
-- not all results matched, reinsert
else (do Remote
ov := DIndex.lookup k ind;
Optional.fold (pure Unit)
(v -> DIndex.insert k v ind)
DIndex.leave : ∀ k v . Node -> DIndex k v -> Remote Unit;
DIndex.leave node ind = do Remote
local-ind := Index.lookup node ind;
Index.delete node ind;
(pure Unit)
(local-ind -> do Remote
keys := Index.keys local-ind;
Remote.fork <| Remote.traverse (k -> DIndex.rebalance k ind) keys;;)