mirror of
https://github.com/unisonweb/unison.git
synced 2024-11-14 16:28:34 +03:00
89 lines
3.2 KiB
Plaintext
89 lines
3.2 KiB
Plaintext
-- A distributed index, using Highest Random Weight (HRW) hashing
|
|
-- to pick which nodes are responsible for which keys. See:
|
|
-- https://en.wikipedia.org/wiki/Rendezvous_hashing
|
|
|
|
DIndex.Replication-Factor = 1
|
|
DIndex.Timeout = Duration.seconds 10
|
|
DIndex.Max-Timeout = Duration.seconds 500
|
|
|
|
alias DIndex k v = Index Node (Index k v)
|
|
|
|
DIndex.empty : ∀ k v . Remote (DIndex k v)
|
|
DIndex.empty = Index.empty
|
|
|
|
-- Pick the nodes responsible for a key, using HRW hashing
|
|
DIndex.nodesForKey : ∀ k v . k -> DIndex k v -> Remote (Vector Node)
|
|
DIndex.nodesForKey k ind = do Remote
|
|
nodes := Index.keys ind
|
|
hashes := Remote.traverse (node -> hash! (node, k)) nodes
|
|
(nodes `Vector.zip` hashes)
|
|
|> Vector.sort-by Hash.Order 2nd
|
|
|> Vector.take DIndex.Replication-Factor
|
|
|> Vector.map 1st
|
|
|> pure
|
|
|
|
DIndex.lookup : ∀ k v . k -> DIndex k v -> Remote (Optional v)
|
|
DIndex.lookup k ind = do Remote
|
|
nodes := DIndex.nodesForKey k ind
|
|
localLookup = node -> do Remote
|
|
nind := Index.lookup node ind
|
|
-- on slim chance that a Node is removed from the cluster just before
|
|
-- we do the lookup, it gets treated like a timeout
|
|
Optional.fold (Remote.map (const None) (Remote.sleep DIndex.Timeout))
|
|
(Index.lookup k)
|
|
nind
|
|
-- todo: use Remote.quorum here
|
|
Remote.race DIndex.Timeout <| Vector.map localLookup nodes
|
|
|
|
DIndex.insert : ∀ k v . k -> v -> DIndex k v -> Remote Unit
|
|
DIndex.insert k v ind = do Remote
|
|
nodes := DIndex.nodesForKey k ind
|
|
localInsert = node -> do Remote
|
|
nind := Index.lookup node ind
|
|
Optional.fold (Remote.map (const Unit) (Remote.sleep DIndex.Timeout))
|
|
(Index.insert k v)
|
|
nind
|
|
Remote.race DIndex.Timeout <| Vector.map localInsert nodes
|
|
|
|
DIndex.join : ∀ k v . DIndex k v -> Remote Unit
|
|
DIndex.join ind = do Remote
|
|
here := Remote.here
|
|
localInd := Index.empty
|
|
Index.insert here localInd ind
|
|
|
|
DIndex.indicesForKey : ∀ k v . k -> DIndex k v -> Remote (Vector (Index k v))
|
|
DIndex.indicesForKey k ind = do Remote
|
|
nodes := DIndex.nodesForKey k ind
|
|
indices := Remote.traverse (node -> Index.lookup node ind) nodes
|
|
pure (Optional.somes indices)
|
|
|
|
DIndex.rebalance : ∀ k v . k -> DIndex k v -> Remote Unit
|
|
DIndex.rebalance k ind = do Remote
|
|
indices := DIndex.indicesForKey k ind
|
|
t = DIndex.Timeout
|
|
results := Remote.parallel-traverse DIndex.Max-Timeout (Index.lookup k `and-then` Remote.timeout t) indices
|
|
resultsHashes := Remote.traverse hash! results
|
|
uh := hash! None
|
|
hd = uh `Optional.get-or` Vector.at 0 resultsHashes
|
|
eq = h1 h2 -> Hash.erase h1 ==_Hash Hash.erase h2
|
|
if Vector.all? (eq hd) resultsHashes
|
|
-- all results matched, we're good
|
|
then pure Unit
|
|
-- not all results matched, reinsert
|
|
else do Remote
|
|
ov := DIndex.lookup k ind
|
|
Optional.fold (pure Unit)
|
|
(v -> DIndex.insert k v ind)
|
|
ov
|
|
|
|
DIndex.leave : ∀ k v . Node -> DIndex k v -> Remote Unit
|
|
DIndex.leave node ind = do Remote
|
|
local-ind := Index.lookup node ind
|
|
Index.delete node ind
|
|
Optional.fold
|
|
(pure Unit)
|
|
(local-ind -> do Remote {
|
|
keys := Index.keys local-ind;
|
|
Remote.fork <| Remote.traverse (k -> DIndex.rebalance k ind) keys })
|
|
local-ind
|