unison/unison-src/dindex.u

89 lines
3.2 KiB
Plaintext

-- A distributed index, using Highest Random Weight (HRW) hashing
-- to pick which nodes are responsible for which keys. See:
-- https://en.wikipedia.org/wiki/Rendezvous_hashing
DIndex.Replication-Factor = 1
DIndex.Timeout = Duration.seconds 10
DIndex.Max-Timeout = Duration.seconds 500
alias DIndex k v = Index Node (Index k v)
DIndex.empty : ∀ k v . Remote (DIndex k v)
DIndex.empty = Index.empty
-- Pick the nodes responsible for a key, using HRW hashing
DIndex.nodesForKey : ∀ k v . k -> DIndex k v -> Remote (Vector Node)
DIndex.nodesForKey k ind = do Remote
nodes := Index.keys ind
hashes := Remote.traverse (node -> hash! (node, k)) nodes
(nodes `Vector.zip` hashes)
|> Vector.sort-by Hash.Order 2nd
|> Vector.take DIndex.Replication-Factor
|> Vector.map 1st
|> pure
DIndex.lookup : ∀ k v . k -> DIndex k v -> Remote (Optional v)
DIndex.lookup k ind = do Remote
nodes := DIndex.nodesForKey k ind
localLookup = node -> do Remote
nind := Index.lookup node ind
-- on slim chance that a Node is removed from the cluster just before
-- we do the lookup, it gets treated like a timeout
Optional.fold (Remote.map (const None) (Remote.sleep DIndex.Timeout))
(Index.lookup k)
nind
-- todo: use Remote.quorum here
Remote.race DIndex.Timeout <| Vector.map localLookup nodes
DIndex.insert : ∀ k v . k -> v -> DIndex k v -> Remote Unit
DIndex.insert k v ind = do Remote
nodes := DIndex.nodesForKey k ind
localInsert = node -> do Remote
nind := Index.lookup node ind
Optional.fold (Remote.map (const Unit) (Remote.sleep DIndex.Timeout))
(Index.insert k v)
nind
Remote.race DIndex.Timeout <| Vector.map localInsert nodes
DIndex.join : ∀ k v . DIndex k v -> Remote Unit
DIndex.join ind = do Remote
here := Remote.here
localInd := Index.empty
Index.insert here localInd ind
DIndex.indicesForKey : ∀ k v . k -> DIndex k v -> Remote (Vector (Index k v))
DIndex.indicesForKey k ind = do Remote
nodes := DIndex.nodesForKey k ind
indices := Remote.traverse (node -> Index.lookup node ind) nodes
pure (Optional.somes indices)
DIndex.rebalance : ∀ k v . k -> DIndex k v -> Remote Unit
DIndex.rebalance k ind = do Remote
indices := DIndex.indicesForKey k ind
t = DIndex.Timeout
results := Remote.parallel-traverse DIndex.Max-Timeout (Index.lookup k `and-then` Remote.timeout t) indices
resultsHashes := Remote.traverse hash! results
uh := hash! None
hd = uh `Optional.get-or` Vector.at 0 resultsHashes
eq = h1 h2 -> Hash.erase h1 ==_Hash Hash.erase h2
if Vector.all? (eq hd) resultsHashes
-- all results matched, we're good
then pure Unit
-- not all results matched, reinsert
else do Remote
ov := DIndex.lookup k ind
Optional.fold (pure Unit)
(v -> DIndex.insert k v ind)
ov
DIndex.leave : ∀ k v . Node -> DIndex k v -> Remote Unit
DIndex.leave node ind = do Remote
local-ind := Index.lookup node ind
Index.delete node ind
Optional.fold
(pure Unit)
(local-ind -> do Remote {
keys := Index.keys local-ind;
Remote.fork <| Remote.traverse (k -> DIndex.rebalance k ind) keys })
local-ind