2021-01-04 01:59:25 +03:00
|
|
|
// Package dag contains the base common code to define an entity stored
|
|
|
|
// in a chain of git objects, supporting actions like Push, Pull and Merge.
|
|
|
|
package dag
|
2020-12-21 13:10:43 +03:00
|
|
|
|
|
|
|
import (
|
|
|
|
"encoding/json"
|
|
|
|
"fmt"
|
|
|
|
"sort"
|
|
|
|
|
|
|
|
"github.com/pkg/errors"
|
|
|
|
|
2021-01-04 01:59:25 +03:00
|
|
|
"github.com/MichaelMure/git-bug/entity"
|
|
|
|
"github.com/MichaelMure/git-bug/identity"
|
2020-12-21 13:10:43 +03:00
|
|
|
"github.com/MichaelMure/git-bug/repository"
|
2020-12-21 20:42:04 +03:00
|
|
|
"github.com/MichaelMure/git-bug/util/lamport"
|
2020-12-21 13:10:43 +03:00
|
|
|
)
|
|
|
|
|
2020-12-21 20:42:04 +03:00
|
|
|
const refsPattern = "refs/%s/%s"
|
|
|
|
const creationClockPattern = "%s-create"
|
|
|
|
const editClockPattern = "%s-edit"
|
|
|
|
|
2020-12-25 13:38:01 +03:00
|
|
|
// Definition hold the details defining one specialization of an Entity.
|
2020-12-21 13:10:43 +03:00
|
|
|
type Definition struct {
|
2020-12-21 20:42:04 +03:00
|
|
|
// the name of the entity (bug, pull-request, ...)
|
|
|
|
typename string
|
|
|
|
// the namespace in git (bugs, prs, ...)
|
|
|
|
namespace string
|
|
|
|
// a function decoding a JSON message into an Operation
|
2021-01-04 01:59:25 +03:00
|
|
|
operationUnmarshaler func(author identity.Interface, raw json.RawMessage) (Operation, error)
|
|
|
|
// a function loading an identity.Identity from its Id
|
|
|
|
identityResolver identity.Resolver
|
|
|
|
// the expected format version number, that can be used for data migration/upgrade
|
2020-12-21 20:42:04 +03:00
|
|
|
formatVersion uint
|
2020-12-21 13:10:43 +03:00
|
|
|
}
|
|
|
|
|
2021-01-04 01:59:25 +03:00
|
|
|
// Entity is a data structure stored in a chain of git objects, supporting actions like Push, Pull and Merge.
|
2020-12-21 13:10:43 +03:00
|
|
|
type Entity struct {
|
|
|
|
Definition
|
|
|
|
|
2021-01-04 01:59:25 +03:00
|
|
|
// operations that are already stored in the repository
|
|
|
|
ops []Operation
|
|
|
|
// operations not yet stored in the repository
|
2020-12-21 20:42:04 +03:00
|
|
|
staging []Operation
|
|
|
|
|
2021-01-04 01:59:25 +03:00
|
|
|
// TODO: add here createTime and editTime
|
|
|
|
|
|
|
|
// // TODO: doesn't seems to actually be useful over the topological sort ? Timestamp can be generated from graph depth
|
|
|
|
// // TODO: maybe EditTime is better because it could spread ops in consecutive groups on the logical timeline --> avoid interleaving
|
|
|
|
// packClock lamport.Clock
|
2020-12-21 20:42:04 +03:00
|
|
|
lastCommit repository.Hash
|
2020-12-21 13:10:43 +03:00
|
|
|
}
|
|
|
|
|
2021-01-04 01:59:25 +03:00
|
|
|
// New create an empty Entity
|
2020-12-21 13:10:43 +03:00
|
|
|
func New(definition Definition) *Entity {
|
|
|
|
return &Entity{
|
|
|
|
Definition: definition,
|
2021-01-04 01:59:25 +03:00
|
|
|
// packClock: lamport.NewMemClock(),
|
2020-12-21 13:10:43 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-01-25 14:39:34 +03:00
|
|
|
// Read will read and decode a stored local Entity from a repository
|
2021-01-04 01:59:25 +03:00
|
|
|
func Read(def Definition, repo repository.ClockedRepo, id entity.Id) (*Entity, error) {
|
2020-12-21 13:10:43 +03:00
|
|
|
if err := id.Validate(); err != nil {
|
|
|
|
return nil, errors.Wrap(err, "invalid id")
|
|
|
|
}
|
|
|
|
|
|
|
|
ref := fmt.Sprintf("refs/%s/%s", def.namespace, id.String())
|
|
|
|
|
2020-12-25 13:38:01 +03:00
|
|
|
return read(def, repo, ref)
|
|
|
|
}
|
|
|
|
|
2021-01-25 14:39:34 +03:00
|
|
|
// readRemote will read and decode a stored remote Entity from a repository
|
|
|
|
func readRemote(def Definition, repo repository.ClockedRepo, remote string, id entity.Id) (*Entity, error) {
|
|
|
|
if err := id.Validate(); err != nil {
|
|
|
|
return nil, errors.Wrap(err, "invalid id")
|
|
|
|
}
|
|
|
|
|
|
|
|
ref := fmt.Sprintf("refs/remotes/%s/%s/%s", def.namespace, remote, id.String())
|
|
|
|
|
|
|
|
return read(def, repo, ref)
|
|
|
|
}
|
|
|
|
|
2020-12-25 13:38:01 +03:00
|
|
|
// read fetch from git and decode an Entity at an arbitrary git reference.
|
|
|
|
func read(def Definition, repo repository.ClockedRepo, ref string) (*Entity, error) {
|
2020-12-21 13:10:43 +03:00
|
|
|
rootHash, err := repo.ResolveRef(ref)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Perform a depth-first search to get a topological order of the DAG where we discover the
|
|
|
|
// parents commit and go back in time up to the chronological root
|
|
|
|
|
|
|
|
stack := make([]repository.Hash, 0, 32)
|
|
|
|
visited := make(map[repository.Hash]struct{})
|
|
|
|
DFSOrder := make([]repository.Commit, 0, 32)
|
|
|
|
|
|
|
|
stack = append(stack, rootHash)
|
|
|
|
|
|
|
|
for len(stack) > 0 {
|
|
|
|
// pop
|
|
|
|
hash := stack[len(stack)-1]
|
|
|
|
stack = stack[:len(stack)-1]
|
|
|
|
|
|
|
|
if _, ok := visited[hash]; ok {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// mark as visited
|
|
|
|
visited[hash] = struct{}{}
|
|
|
|
|
|
|
|
commit, err := repo.ReadCommit(hash)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
DFSOrder = append(DFSOrder, commit)
|
|
|
|
|
|
|
|
for _, parent := range commit.Parents {
|
|
|
|
stack = append(stack, parent)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Now, we can reverse this topological order and read the commits in an order where
|
|
|
|
// we are sure to have read all the chronological ancestors when we read a commit.
|
|
|
|
|
|
|
|
// Next step is to:
|
|
|
|
// 1) read the operationPacks
|
|
|
|
// 2) make sure that the clocks causality respect the DAG topology.
|
|
|
|
|
|
|
|
oppMap := make(map[repository.Hash]*operationPack)
|
|
|
|
var opsCount int
|
2021-01-04 01:59:25 +03:00
|
|
|
// var packClock = lamport.NewMemClock()
|
2020-12-21 13:10:43 +03:00
|
|
|
|
2020-12-21 20:42:04 +03:00
|
|
|
for i := len(DFSOrder) - 1; i >= 0; i-- {
|
2020-12-21 13:10:43 +03:00
|
|
|
commit := DFSOrder[i]
|
2021-01-04 01:59:25 +03:00
|
|
|
isFirstCommit := i == len(DFSOrder)-1
|
|
|
|
isMerge := len(commit.Parents) > 1
|
2020-12-21 13:10:43 +03:00
|
|
|
|
2020-12-21 20:42:04 +03:00
|
|
|
// Verify DAG structure: single chronological root, so only the root
|
2021-01-04 01:59:25 +03:00
|
|
|
// can have no parents. Said otherwise, the DAG need to have exactly
|
|
|
|
// one leaf.
|
|
|
|
if !isFirstCommit && len(commit.Parents) == 0 {
|
|
|
|
return nil, fmt.Errorf("multiple leafs in the entity DAG")
|
2020-12-21 13:10:43 +03:00
|
|
|
}
|
|
|
|
|
2021-01-04 01:59:25 +03:00
|
|
|
opp, err := readOperationPack(def, repo, commit)
|
2020-12-21 13:10:43 +03:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2021-01-04 01:59:25 +03:00
|
|
|
err = opp.Validate()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
2020-12-21 20:42:04 +03:00
|
|
|
}
|
2021-01-04 01:59:25 +03:00
|
|
|
|
|
|
|
// Check that the create lamport clock is set (not checked in Validate() as it's optional)
|
|
|
|
if isFirstCommit && opp.CreateTime <= 0 {
|
|
|
|
return nil, fmt.Errorf("creation lamport time not set")
|
2020-12-21 20:42:04 +03:00
|
|
|
}
|
|
|
|
|
2020-12-21 13:10:43 +03:00
|
|
|
// make sure that the lamport clocks causality match the DAG topology
|
|
|
|
for _, parentHash := range commit.Parents {
|
|
|
|
parentPack, ok := oppMap[parentHash]
|
|
|
|
if !ok {
|
|
|
|
panic("DFS failed")
|
|
|
|
}
|
|
|
|
|
|
|
|
if parentPack.EditTime >= opp.EditTime {
|
|
|
|
return nil, fmt.Errorf("lamport clock ordering doesn't match the DAG")
|
|
|
|
}
|
|
|
|
|
|
|
|
// to avoid an attack where clocks are pushed toward the uint64 rollover, make sure
|
|
|
|
// that the clocks don't jump too far in the future
|
2021-01-04 01:59:25 +03:00
|
|
|
// we ignore merge commits here to allow merging after a loooong time without breaking anything,
|
|
|
|
// as long as there is one valid chain of small hops, it's fine.
|
|
|
|
if !isMerge && opp.EditTime-parentPack.EditTime > 1_000_000 {
|
2020-12-21 13:10:43 +03:00
|
|
|
return nil, fmt.Errorf("lamport clock jumping too far in the future, likely an attack")
|
|
|
|
}
|
2021-01-04 01:59:25 +03:00
|
|
|
|
|
|
|
// TODO: PackTime is not checked
|
2020-12-21 13:10:43 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
oppMap[commit.Hash] = opp
|
|
|
|
opsCount += len(opp.Operations)
|
|
|
|
}
|
|
|
|
|
2020-12-21 20:42:04 +03:00
|
|
|
// The clocks are fine, we witness them
|
|
|
|
for _, opp := range oppMap {
|
|
|
|
err = repo.Witness(fmt.Sprintf(creationClockPattern, def.namespace), opp.CreateTime)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
err = repo.Witness(fmt.Sprintf(editClockPattern, def.namespace), opp.EditTime)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2021-01-04 01:59:25 +03:00
|
|
|
// err = packClock.Witness(opp.PackTime)
|
|
|
|
// if err != nil {
|
|
|
|
// return nil, err
|
|
|
|
// }
|
2020-12-21 20:42:04 +03:00
|
|
|
}
|
|
|
|
|
2020-12-21 13:10:43 +03:00
|
|
|
// Now that we know that the topological order and clocks are fine, we order the operationPacks
|
|
|
|
// based on the logical clocks, entirely ignoring the DAG topology
|
|
|
|
|
|
|
|
oppSlice := make([]*operationPack, 0, len(oppMap))
|
|
|
|
for _, pack := range oppMap {
|
|
|
|
oppSlice = append(oppSlice, pack)
|
|
|
|
}
|
|
|
|
sort.Slice(oppSlice, func(i, j int) bool {
|
2020-12-25 13:38:01 +03:00
|
|
|
// Primary ordering with the dedicated "pack" Lamport time that encode causality
|
|
|
|
// within the entity
|
2021-01-04 01:59:25 +03:00
|
|
|
// if oppSlice[i].PackTime != oppSlice[j].PackTime {
|
|
|
|
// return oppSlice[i].PackTime < oppSlice[i].PackTime
|
|
|
|
// }
|
2020-12-25 13:38:01 +03:00
|
|
|
// We have equal PackTime, which means we had a concurrent edition. We can't tell which exactly
|
|
|
|
// came first. As a secondary arbitrary ordering, we can use the EditTime. It's unlikely to be
|
|
|
|
// enough but it can give us an edge to approach what really happened.
|
|
|
|
if oppSlice[i].EditTime != oppSlice[j].EditTime {
|
|
|
|
return oppSlice[i].EditTime < oppSlice[j].EditTime
|
|
|
|
}
|
2021-01-04 01:59:25 +03:00
|
|
|
// Well, what now? We still need a total ordering and the most stable possible.
|
2020-12-25 13:38:01 +03:00
|
|
|
// As a last resort, we can order based on a hash of the serialized Operations in the
|
|
|
|
// operationPack. It doesn't carry much meaning but it's unbiased and hard to abuse.
|
2021-01-04 01:59:25 +03:00
|
|
|
// This is a lexicographic ordering on the stringified ID.
|
|
|
|
return oppSlice[i].Id() < oppSlice[j].Id()
|
2020-12-21 13:10:43 +03:00
|
|
|
})
|
|
|
|
|
|
|
|
// Now that we ordered the operationPacks, we have the order of the Operations
|
|
|
|
|
|
|
|
ops := make([]Operation, 0, opsCount)
|
|
|
|
for _, pack := range oppSlice {
|
|
|
|
for _, operation := range pack.Operations {
|
|
|
|
ops = append(ops, operation)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return &Entity{
|
|
|
|
Definition: def,
|
|
|
|
ops: ops,
|
2021-01-04 01:59:25 +03:00
|
|
|
// packClock: packClock,
|
2020-12-21 20:42:04 +03:00
|
|
|
lastCommit: rootHash,
|
2020-12-21 13:10:43 +03:00
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
2021-01-25 14:39:34 +03:00
|
|
|
type StreamedEntity struct {
|
|
|
|
Entity *Entity
|
|
|
|
Err error
|
|
|
|
}
|
|
|
|
|
|
|
|
// ReadAll read and parse all local Entity
|
|
|
|
func ReadAll(def Definition, repo repository.ClockedRepo) <-chan StreamedEntity {
|
|
|
|
out := make(chan StreamedEntity)
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
defer close(out)
|
|
|
|
|
|
|
|
refPrefix := fmt.Sprintf("refs/%s/", def.namespace)
|
|
|
|
|
|
|
|
refs, err := repo.ListRefs(refPrefix)
|
|
|
|
if err != nil {
|
|
|
|
out <- StreamedEntity{Err: err}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, ref := range refs {
|
|
|
|
e, err := read(def, repo, ref)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
out <- StreamedEntity{Err: err}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
out <- StreamedEntity{Entity: e}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
return out
|
|
|
|
}
|
|
|
|
|
2020-12-21 20:42:04 +03:00
|
|
|
// Id return the Entity identifier
|
2021-01-04 01:59:25 +03:00
|
|
|
func (e *Entity) Id() entity.Id {
|
2020-12-21 20:42:04 +03:00
|
|
|
// id is the id of the first operation
|
|
|
|
return e.FirstOp().Id()
|
2020-12-21 13:10:43 +03:00
|
|
|
}
|
|
|
|
|
2021-01-04 01:59:25 +03:00
|
|
|
// Validate check if the Entity data is valid
|
2020-12-21 20:42:04 +03:00
|
|
|
func (e *Entity) Validate() error {
|
|
|
|
// non-empty
|
|
|
|
if len(e.ops) == 0 && len(e.staging) == 0 {
|
|
|
|
return fmt.Errorf("entity has no operations")
|
|
|
|
}
|
|
|
|
|
|
|
|
// check if each operations are valid
|
|
|
|
for _, op := range e.ops {
|
|
|
|
if err := op.Validate(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// check if staging is valid if needed
|
|
|
|
for _, op := range e.staging {
|
|
|
|
if err := op.Validate(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Check that there is no colliding operation's ID
|
2021-01-04 01:59:25 +03:00
|
|
|
ids := make(map[entity.Id]struct{})
|
2020-12-21 20:42:04 +03:00
|
|
|
for _, op := range e.Operations() {
|
|
|
|
if _, ok := ids[op.Id()]; ok {
|
|
|
|
return fmt.Errorf("id collision: %s", op.Id())
|
|
|
|
}
|
|
|
|
ids[op.Id()] = struct{}{}
|
|
|
|
}
|
2020-12-21 13:10:43 +03:00
|
|
|
|
2020-12-21 20:42:04 +03:00
|
|
|
return nil
|
2020-12-21 13:10:43 +03:00
|
|
|
}
|
|
|
|
|
2021-01-04 01:59:25 +03:00
|
|
|
// Operations return the ordered operations
|
2020-12-21 13:10:43 +03:00
|
|
|
func (e *Entity) Operations() []Operation {
|
2020-12-21 20:42:04 +03:00
|
|
|
return append(e.ops, e.staging...)
|
2020-12-21 13:10:43 +03:00
|
|
|
}
|
|
|
|
|
2021-01-04 01:59:25 +03:00
|
|
|
// FirstOp lookup for the very first operation of the Entity
|
2020-12-21 20:42:04 +03:00
|
|
|
func (e *Entity) FirstOp() Operation {
|
|
|
|
for _, op := range e.ops {
|
|
|
|
return op
|
|
|
|
}
|
|
|
|
for _, op := range e.staging {
|
|
|
|
return op
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-01-04 01:59:25 +03:00
|
|
|
// LastOp lookup for the very last operation of the Entity
|
|
|
|
func (e *Entity) LastOp() Operation {
|
|
|
|
if len(e.staging) > 0 {
|
|
|
|
return e.staging[len(e.staging)-1]
|
|
|
|
}
|
|
|
|
if len(e.ops) > 0 {
|
|
|
|
return e.ops[len(e.ops)-1]
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Append add a new Operation to the Entity
|
2020-12-21 20:42:04 +03:00
|
|
|
func (e *Entity) Append(op Operation) {
|
|
|
|
e.staging = append(e.staging, op)
|
|
|
|
}
|
|
|
|
|
2021-01-04 01:59:25 +03:00
|
|
|
// NeedCommit indicate if the in-memory state changed and need to be commit in the repository
|
2020-12-21 20:42:04 +03:00
|
|
|
func (e *Entity) NeedCommit() bool {
|
|
|
|
return len(e.staging) > 0
|
|
|
|
}
|
|
|
|
|
2021-01-04 01:59:25 +03:00
|
|
|
// CommitAdNeeded execute a Commit only if necessary. This function is useful to avoid getting an error if the Entity
|
|
|
|
// is already in sync with the repository.
|
2020-12-21 20:42:04 +03:00
|
|
|
func (e *Entity) CommitAdNeeded(repo repository.ClockedRepo) error {
|
|
|
|
if e.NeedCommit() {
|
|
|
|
return e.Commit(repo)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-01-04 01:59:25 +03:00
|
|
|
// Commit write the appended operations in the repository
|
2020-12-21 20:42:04 +03:00
|
|
|
func (e *Entity) Commit(repo repository.ClockedRepo) error {
|
|
|
|
if !e.NeedCommit() {
|
|
|
|
return fmt.Errorf("can't commit an entity with no pending operation")
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := e.Validate(); err != nil {
|
|
|
|
return errors.Wrapf(err, "can't commit a %s with invalid data", e.Definition.typename)
|
|
|
|
}
|
|
|
|
|
2021-01-04 01:59:25 +03:00
|
|
|
var author identity.Interface
|
|
|
|
for _, op := range e.staging {
|
|
|
|
if author != nil && op.Author() != author {
|
|
|
|
return fmt.Errorf("operations with different author")
|
|
|
|
}
|
|
|
|
author = op.Author()
|
2020-12-21 20:42:04 +03:00
|
|
|
}
|
2021-01-04 01:59:25 +03:00
|
|
|
|
|
|
|
// increment the various clocks for this new operationPack
|
|
|
|
// packTime, err := e.packClock.Increment()
|
|
|
|
// if err != nil {
|
|
|
|
// return err
|
|
|
|
// }
|
2020-12-21 20:42:04 +03:00
|
|
|
editTime, err := repo.Increment(fmt.Sprintf(editClockPattern, e.namespace))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
var creationTime lamport.Time
|
|
|
|
if e.lastCommit == "" {
|
|
|
|
creationTime, err = repo.Increment(fmt.Sprintf(creationClockPattern, e.namespace))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
opp := &operationPack{
|
2021-01-04 01:59:25 +03:00
|
|
|
Author: author,
|
2020-12-21 20:42:04 +03:00
|
|
|
Operations: e.staging,
|
|
|
|
CreateTime: creationTime,
|
|
|
|
EditTime: editTime,
|
2021-01-04 01:59:25 +03:00
|
|
|
// PackTime: packTime,
|
2020-12-21 20:42:04 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
var commitHash repository.Hash
|
2021-01-24 21:45:21 +03:00
|
|
|
if e.lastCommit == "" {
|
|
|
|
commitHash, err = opp.Write(e.Definition, repo)
|
2020-12-21 20:42:04 +03:00
|
|
|
} else {
|
2021-01-24 21:45:21 +03:00
|
|
|
commitHash, err = opp.Write(e.Definition, repo, e.lastCommit)
|
2020-12-21 20:42:04 +03:00
|
|
|
}
|
2021-01-24 21:45:21 +03:00
|
|
|
|
2020-12-21 20:42:04 +03:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
e.lastCommit = commitHash
|
|
|
|
e.ops = append(e.ops, e.staging...)
|
|
|
|
e.staging = nil
|
|
|
|
|
|
|
|
// Create or update the Git reference for this entity
|
|
|
|
// When pushing later, the remote will ensure that this ref update
|
|
|
|
// is fast-forward, that is no data has been overwritten.
|
|
|
|
ref := fmt.Sprintf(refsPattern, e.namespace, e.Id().String())
|
|
|
|
return repo.UpdateRef(ref, commitHash)
|
2020-12-21 13:10:43 +03:00
|
|
|
}
|