Merge pull request #254 from MichaelMure/import-events

Bridge: correctly emit NothingEvents
This commit is contained in:
Michael Muré 2019-11-19 21:13:13 +01:00 committed by GitHub
commit dcf3feb0b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 48 additions and 68 deletions

View File

@ -148,8 +148,6 @@ func (ge *githubExporter) ExportAll(ctx context.Context, repo *cache.RepoCache,
if snapshot.HasAnyActor(allIdentitiesIds...) { if snapshot.HasAnyActor(allIdentitiesIds...) {
// try to export the bug and it associated events // try to export the bug and it associated events
ge.exportBug(ctx, b, since, out) ge.exportBug(ctx, b, since, out)
} else {
out <- core.NewExportNothing(id, "not an actor")
} }
} }
} }
@ -161,6 +159,7 @@ func (ge *githubExporter) ExportAll(ctx context.Context, repo *cache.RepoCache,
// exportBug publish bugs and related events // exportBug publish bugs and related events
func (ge *githubExporter) exportBug(ctx context.Context, b *cache.BugCache, since time.Time, out chan<- core.ExportResult) { func (ge *githubExporter) exportBug(ctx context.Context, b *cache.BugCache, since time.Time, out chan<- core.ExportResult) {
snapshot := b.Snapshot() snapshot := b.Snapshot()
var bugUpdated bool
var bugGithubID string var bugGithubID string
var bugGithubURL string var bugGithubURL string
@ -198,13 +197,12 @@ func (ge *githubExporter) exportBug(ctx context.Context, b *cache.BugCache, sinc
return return
} }
// ignore issue comming from other repositories // ignore issue coming from other repositories
if owner != ge.conf[keyOwner] && project != ge.conf[keyProject] { if owner != ge.conf[keyOwner] && project != ge.conf[keyProject] {
out <- core.NewExportNothing(b.Id(), fmt.Sprintf("skipping issue from url:%s", githubURL)) out <- core.NewExportNothing(b.Id(), fmt.Sprintf("skipping issue from url:%s", githubURL))
return return
} }
out <- core.NewExportNothing(b.Id(), "bug already exported")
// will be used to mark operation related to a bug as exported // will be used to mark operation related to a bug as exported
bugGithubID = githubID bugGithubID = githubID
bugGithubURL = githubURL bugGithubURL = githubURL
@ -260,24 +258,20 @@ func (ge *githubExporter) exportBug(ctx context.Context, b *cache.BugCache, sinc
// cache the ID of already exported or imported issues and events from Github // cache the ID of already exported or imported issues and events from Github
if id, ok := op.GetMetadata(metaKeyGithubId); ok { if id, ok := op.GetMetadata(metaKeyGithubId); ok {
ge.cachedOperationIDs[op.Id()] = id ge.cachedOperationIDs[op.Id()] = id
out <- core.NewExportNothing(op.Id(), "already exported operation")
continue continue
} }
opAuthor := op.GetAuthor() opAuthor := op.GetAuthor()
client, err := ge.getIdentityClient(opAuthor.Id()) client, err := ge.getIdentityClient(opAuthor.Id())
if err != nil { if err != nil {
out <- core.NewExportNothing(op.Id(), "missing operation author token")
continue continue
} }
var id, url string var id, url string
switch op.(type) { switch op := op.(type) {
case *bug.AddCommentOperation: case *bug.AddCommentOperation:
opr := op.(*bug.AddCommentOperation)
// send operation to github // send operation to github
id, url, err = addCommentGithubIssue(ctx, client, bugGithubID, opr.Message) id, url, err = addCommentGithubIssue(ctx, client, bugGithubID, op.Message)
if err != nil { if err != nil {
err := errors.Wrap(err, "adding comment") err := errors.Wrap(err, "adding comment")
out <- core.NewExportError(err, b.Id()) out <- core.NewExportError(err, b.Id())
@ -290,14 +284,11 @@ func (ge *githubExporter) exportBug(ctx context.Context, b *cache.BugCache, sinc
ge.cachedOperationIDs[op.Id()] = id ge.cachedOperationIDs[op.Id()] = id
case *bug.EditCommentOperation: case *bug.EditCommentOperation:
opr := op.(*bug.EditCommentOperation)
// Since github doesn't consider the issue body as a comment // Since github doesn't consider the issue body as a comment
if opr.Target == createOp.Id() { if op.Target == createOp.Id() {
// case bug creation operation: we need to edit the Github issue // case bug creation operation: we need to edit the Github issue
if err := updateGithubIssueBody(ctx, client, bugGithubID, opr.Message); err != nil { if err := updateGithubIssueBody(ctx, client, bugGithubID, op.Message); err != nil {
err := errors.Wrap(err, "editing issue") err := errors.Wrap(err, "editing issue")
out <- core.NewExportError(err, b.Id()) out <- core.NewExportError(err, b.Id())
return return
@ -311,12 +302,12 @@ func (ge *githubExporter) exportBug(ctx context.Context, b *cache.BugCache, sinc
} else { } else {
// case comment edition operation: we need to edit the Github comment // case comment edition operation: we need to edit the Github comment
commentID, ok := ge.cachedOperationIDs[opr.Target] commentID, ok := ge.cachedOperationIDs[op.Target]
if !ok { if !ok {
panic("unexpected error: comment id not found") panic("unexpected error: comment id not found")
} }
eid, eurl, err := editCommentGithubIssue(ctx, client, commentID, opr.Message) eid, eurl, err := editCommentGithubIssue(ctx, client, commentID, op.Message)
if err != nil { if err != nil {
err := errors.Wrap(err, "editing comment") err := errors.Wrap(err, "editing comment")
out <- core.NewExportError(err, b.Id()) out <- core.NewExportError(err, b.Id())
@ -331,8 +322,7 @@ func (ge *githubExporter) exportBug(ctx context.Context, b *cache.BugCache, sinc
} }
case *bug.SetStatusOperation: case *bug.SetStatusOperation:
opr := op.(*bug.SetStatusOperation) if err := updateGithubIssueStatus(ctx, client, bugGithubID, op.Status); err != nil {
if err := updateGithubIssueStatus(ctx, client, bugGithubID, opr.Status); err != nil {
err := errors.Wrap(err, "editing status") err := errors.Wrap(err, "editing status")
out <- core.NewExportError(err, b.Id()) out <- core.NewExportError(err, b.Id())
return return
@ -344,8 +334,7 @@ func (ge *githubExporter) exportBug(ctx context.Context, b *cache.BugCache, sinc
url = bugGithubURL url = bugGithubURL
case *bug.SetTitleOperation: case *bug.SetTitleOperation:
opr := op.(*bug.SetTitleOperation) if err := updateGithubIssueTitle(ctx, client, bugGithubID, op.Title); err != nil {
if err := updateGithubIssueTitle(ctx, client, bugGithubID, opr.Title); err != nil {
err := errors.Wrap(err, "editing title") err := errors.Wrap(err, "editing title")
out <- core.NewExportError(err, b.Id()) out <- core.NewExportError(err, b.Id())
return return
@ -357,8 +346,7 @@ func (ge *githubExporter) exportBug(ctx context.Context, b *cache.BugCache, sinc
url = bugGithubURL url = bugGithubURL
case *bug.LabelChangeOperation: case *bug.LabelChangeOperation:
opr := op.(*bug.LabelChangeOperation) if err := ge.updateGithubIssueLabels(ctx, client, bugGithubID, op.Added, op.Removed); err != nil {
if err := ge.updateGithubIssueLabels(ctx, client, bugGithubID, opr.Added, opr.Removed); err != nil {
err := errors.Wrap(err, "updating labels") err := errors.Wrap(err, "updating labels")
out <- core.NewExportError(err, b.Id()) out <- core.NewExportError(err, b.Id())
return return
@ -386,6 +374,12 @@ func (ge *githubExporter) exportBug(ctx context.Context, b *cache.BugCache, sinc
out <- core.NewExportError(err, b.Id()) out <- core.NewExportError(err, b.Id())
return return
} }
bugUpdated = true
}
if !bugUpdated {
out <- core.NewExportNothing(b.Id(), "nothing has been exported")
} }
} }

View File

@ -60,15 +60,18 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache,
// loop over timeline items // loop over timeline items
for gi.iterator.NextTimelineItem() { for gi.iterator.NextTimelineItem() {
item := gi.iterator.TimelineItemValue() item := gi.iterator.TimelineItemValue()
if err := gi.ensureTimelineItem(repo, b, item); err != nil { err := gi.ensureTimelineItem(repo, b, item)
err := fmt.Errorf("timeline item creation: %v", err) if err != nil {
err = fmt.Errorf("timeline item creation: %v", err)
out <- core.NewImportError(err, "") out <- core.NewImportError(err, "")
return return
} }
} }
// commit bug state if !b.NeedCommit() {
if err := b.CommitAsNeeded(); err != nil { out <- core.NewImportNothing(b.Id(), "no imported operation")
} else if err := b.Commit(); err != nil {
// commit bug state
err = fmt.Errorf("bug commit: %v", err) err = fmt.Errorf("bug commit: %v", err)
out <- core.NewImportError(err, "") out <- core.NewImportError(err, "")
return return
@ -128,16 +131,12 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline
// importing a new bug // importing a new bug
gi.out <- core.NewImportBug(b.Id()) gi.out <- core.NewImportBug(b.Id())
} else {
gi.out <- core.NewImportNothing("", "bug already imported")
} }
} else { } else {
// create bug from given issueEdits // create bug from given issueEdits
for i, edit := range issueEdits { for i, edit := range issueEdits {
if i == 0 && b != nil { if i == 0 && b != nil {
// The first edit in the github result is the issue creation itself, we already have that // The first edit in the github result is the issue creation itself, we already have that
gi.out <- core.NewImportNothing("", "bug already imported")
continue continue
} }
@ -165,7 +164,6 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline
if err != nil { if err != nil {
return nil, err return nil, err
} }
// importing a new bug // importing a new bug
gi.out <- core.NewImportBug(b.Id()) gi.out <- core.NewImportBug(b.Id())
continue continue
@ -202,13 +200,12 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
if err != nil { if err != nil {
return fmt.Errorf("timeline comment creation: %v", err) return fmt.Errorf("timeline comment creation: %v", err)
} }
return nil
case "LabeledEvent": case "LabeledEvent":
id := parseId(item.LabeledEvent.Id) id := parseId(item.LabeledEvent.Id)
_, err := b.ResolveOperationWithMetadata(metaKeyGithubId, id) _, err := b.ResolveOperationWithMetadata(metaKeyGithubId, id)
if err == nil { if err == nil {
reason := fmt.Sprintf("operation already imported: %v", item.Typename)
gi.out <- core.NewImportNothing("", reason)
return nil return nil
} }
@ -239,8 +236,6 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
id := parseId(item.UnlabeledEvent.Id) id := parseId(item.UnlabeledEvent.Id)
_, err := b.ResolveOperationWithMetadata(metaKeyGithubId, id) _, err := b.ResolveOperationWithMetadata(metaKeyGithubId, id)
if err == nil { if err == nil {
reason := fmt.Sprintf("operation already imported: %v", item.Typename)
gi.out <- core.NewImportNothing("", reason)
return nil return nil
} }
if err != cache.ErrNoMatchingOp { if err != cache.ErrNoMatchingOp {
@ -274,8 +269,6 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
return err return err
} }
if err == nil { if err == nil {
reason := fmt.Sprintf("operation already imported: %v", item.Typename)
gi.out <- core.NewImportNothing("", reason)
return nil return nil
} }
author, err := gi.ensurePerson(repo, item.ClosedEvent.Actor) author, err := gi.ensurePerson(repo, item.ClosedEvent.Actor)
@ -302,8 +295,6 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
return err return err
} }
if err == nil { if err == nil {
reason := fmt.Sprintf("operation already imported: %v", item.Typename)
gi.out <- core.NewImportNothing("", reason)
return nil return nil
} }
author, err := gi.ensurePerson(repo, item.ReopenedEvent.Actor) author, err := gi.ensurePerson(repo, item.ReopenedEvent.Actor)
@ -330,8 +321,6 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
return err return err
} }
if err == nil { if err == nil {
reason := fmt.Sprintf("operation already imported: %v", item.Typename)
gi.out <- core.NewImportNothing("", reason)
return nil return nil
} }
author, err := gi.ensurePerson(repo, item.RenamedTitleEvent.Actor) author, err := gi.ensurePerson(repo, item.RenamedTitleEvent.Actor)
@ -350,10 +339,6 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
gi.out <- core.NewImportTitleEdition(op.Id()) gi.out <- core.NewImportTitleEdition(op.Id())
return nil return nil
default:
reason := fmt.Sprintf("ignoring timeline type: %v", item.Typename)
gi.out <- core.NewImportNothing("", reason)
} }
return nil return nil
@ -367,9 +352,7 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache.
} }
targetOpID, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(item.Id)) targetOpID, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(item.Id))
if err == nil { if err != nil && err != cache.ErrNoMatchingOp {
gi.out <- core.NewImportNothing("", "comment already imported")
} else if err != cache.ErrNoMatchingOp {
// real error // real error
return err return err
} }
@ -398,13 +381,13 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache.
} }
gi.out <- core.NewImportComment(op.Id()) gi.out <- core.NewImportComment(op.Id())
return nil
} }
} else { } else {
for i, edit := range edits { for i, edit := range edits {
if i == 0 && targetOpID != "" { if i == 0 && targetOpID != "" {
// The first edit in the github result is the comment creation itself, we already have that // The first edit in the github result is the comment creation itself, we already have that
gi.out <- core.NewImportNothing("", "comment already imported")
continue continue
} }
@ -434,6 +417,7 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache.
if err != nil { if err != nil {
return err return err
} }
gi.out <- core.NewImportComment(op.Id())
// set target for the nexr edit now that the comment is created // set target for the nexr edit now that the comment is created
targetOpID = op.Id() targetOpID = op.Id()
@ -452,7 +436,6 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache.
func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugCache, target entity.Id, edit userContentEdit) error { func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugCache, target entity.Id, edit userContentEdit) error {
_, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(edit.Id)) _, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(edit.Id))
if err == nil { if err == nil {
gi.out <- core.NewImportNothing(b.Id(), "edition already imported")
return nil return nil
} }
if err != cache.ErrNoMatchingOp { if err != cache.ErrNoMatchingOp {
@ -468,7 +451,7 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC
switch { switch {
case edit.DeletedAt != nil: case edit.DeletedAt != nil:
// comment deletion, not supported yet // comment deletion, not supported yet
gi.out <- core.NewImportNothing(b.Id(), "comment deletion is not supported yet") return nil
case edit.DeletedAt == nil: case edit.DeletedAt == nil:
@ -493,8 +476,8 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC
} }
gi.out <- core.NewImportCommentEdition(op.Id()) gi.out <- core.NewImportCommentEdition(op.Id())
return nil
} }
return nil return nil
} }

View File

@ -117,8 +117,6 @@ func (ge *gitlabExporter) ExportAll(ctx context.Context, repo *cache.RepoCache,
if snapshot.HasAnyActor(allIdentitiesIds...) { if snapshot.HasAnyActor(allIdentitiesIds...) {
// try to export the bug and it associated events // try to export the bug and it associated events
ge.exportBug(ctx, b, since, out) ge.exportBug(ctx, b, since, out)
} else {
out <- core.NewExportNothing(id, "not an actor")
} }
} }
} }
@ -131,6 +129,7 @@ func (ge *gitlabExporter) ExportAll(ctx context.Context, repo *cache.RepoCache,
func (ge *gitlabExporter) exportBug(ctx context.Context, b *cache.BugCache, since time.Time, out chan<- core.ExportResult) { func (ge *gitlabExporter) exportBug(ctx context.Context, b *cache.BugCache, since time.Time, out chan<- core.ExportResult) {
snapshot := b.Snapshot() snapshot := b.Snapshot()
var bugUpdated bool
var err error var err error
var bugGitlabID int var bugGitlabID int
var bugGitlabIDString string var bugGitlabIDString string
@ -166,8 +165,6 @@ func (ge *gitlabExporter) exportBug(ctx context.Context, b *cache.BugCache, sinc
return return
} }
out <- core.NewExportNothing(b.Id(), "bug already exported")
// will be used to mark operation related to a bug as exported // will be used to mark operation related to a bug as exported
bugGitlabIDString = gitlabID bugGitlabIDString = gitlabID
bugGitlabID, err = strconv.Atoi(bugGitlabIDString) bugGitlabID, err = strconv.Atoi(bugGitlabIDString)
@ -237,14 +234,12 @@ func (ge *gitlabExporter) exportBug(ctx context.Context, b *cache.BugCache, sinc
// cache the ID of already exported or imported issues and events from Gitlab // cache the ID of already exported or imported issues and events from Gitlab
if id, ok := op.GetMetadata(metaKeyGitlabId); ok { if id, ok := op.GetMetadata(metaKeyGitlabId); ok {
ge.cachedOperationIDs[op.Id().String()] = id ge.cachedOperationIDs[op.Id().String()] = id
out <- core.NewExportNothing(op.Id(), "already exported operation")
continue continue
} }
opAuthor := op.GetAuthor() opAuthor := op.GetAuthor()
client, err := ge.getIdentityClient(opAuthor.Id()) client, err := ge.getIdentityClient(opAuthor.Id())
if err != nil { if err != nil {
out <- core.NewExportNothing(op.Id(), "missing operation author token")
continue continue
} }
@ -371,6 +366,12 @@ func (ge *gitlabExporter) exportBug(ctx context.Context, b *cache.BugCache, sinc
out <- core.NewExportError(err, b.Id()) out <- core.NewExportError(err, b.Id())
return return
} }
bugUpdated = true
}
if !bugUpdated {
out <- core.NewExportNothing(b.Id(), "nothing has been exported")
} }
} }

View File

@ -73,8 +73,10 @@ func (gi *gitlabImporter) ImportAll(ctx context.Context, repo *cache.RepoCache,
} }
} }
// commit bug state if !b.NeedCommit() {
if err := b.CommitAsNeeded(); err != nil { out <- core.NewImportNothing(b.Id(), "no imported operation")
} else if err := b.Commit(); err != nil {
// commit bug state
err := fmt.Errorf("bug commit: %v", err) err := fmt.Errorf("bug commit: %v", err)
out <- core.NewImportError(err, "") out <- core.NewImportError(err, "")
return return
@ -99,7 +101,6 @@ func (gi *gitlabImporter) ensureIssue(repo *cache.RepoCache, issue *gitlab.Issue
// resolve bug // resolve bug
b, err := repo.ResolveBugCreateMetadata(metaKeyGitlabUrl, issue.WebURL) b, err := repo.ResolveBugCreateMetadata(metaKeyGitlabUrl, issue.WebURL)
if err == nil { if err == nil {
gi.out <- core.NewImportNothing("", "bug already imported")
return b, nil return b, nil
} }
if err != bug.ErrBugNotExist { if err != bug.ErrBugNotExist {
@ -299,8 +300,6 @@ func (gi *gitlabImporter) ensureNote(repo *cache.RepoCache, b *cache.BugCache, n
NOTE_MENTIONED_IN_ISSUE, NOTE_MENTIONED_IN_ISSUE,
NOTE_MENTIONED_IN_MERGE_REQUEST: NOTE_MENTIONED_IN_MERGE_REQUEST:
reason := fmt.Sprintf("unsupported note type: %s", noteType.String())
gi.out <- core.NewImportNothing("", reason)
return nil return nil
default: default:

View File

@ -103,8 +103,6 @@ func (li *launchpadImporter) ImportAll(ctx context.Context, repo *cache.RepoCach
/* Handle messages */ /* Handle messages */
if len(lpBug.Messages) == 0 { if len(lpBug.Messages) == 0 {
err := fmt.Sprintf("bug doesn't have any comments")
out <- core.NewImportNothing(entity.Id(lpBugID), err)
return return
} }
@ -149,8 +147,9 @@ func (li *launchpadImporter) ImportAll(ctx context.Context, repo *cache.RepoCach
out <- core.NewImportComment(op.Id()) out <- core.NewImportComment(op.Id())
} }
err = b.CommitAsNeeded() if !b.NeedCommit() {
if err != nil { out <- core.NewImportNothing(b.Id(), "no imported operation")
} else if err := b.Commit(); err != nil {
out <- core.NewImportError(err, "") out <- core.NewImportError(err, "")
return return
} }

4
cache/bug_cache.go vendored
View File

@ -265,3 +265,7 @@ func (c *BugCache) CommitAsNeeded() error {
} }
return c.notifyUpdated() return c.notifyUpdated()
} }
func (c *BugCache) NeedCommit() bool {
return c.bug.NeedCommit()
}