github: fix data race when closing event channel

I believe the issue was twofold:

When done importing, the calling context is likely still valid, so if the output channel is not read enough and reach capacity, some event producer down the line can be blocked trying to send in that channel. When closing it, this send is still trying to proceed, which is illegal in go.

In rateLimitHandlerClient, there was a need to 2 different type of output channel: core.ExportResult and ImportEvent. To do so, the previous code was using a single channel type RateLimitingEvent and a series of goroutines to read/cast/send to the final channel. This could result in more async goroutine being stuck trying to send in an at-capacity channel. Instead, the code now use a simple synchronous callback to directly push to the final output channel. No concurrency needed anymore and the code is simpler.

Any of those fixes could have resolved the data race, but both fixes is more correct.
This commit is contained in:
Michael Muré 2022-06-05 15:01:08 +02:00
parent 96327c3371
commit 7348fb9edb
No known key found for this signature in database
GPG Key ID: A4457C029293126F
4 changed files with 147 additions and 154 deletions

View File

@ -7,8 +7,9 @@ import (
"strings"
"time"
"github.com/MichaelMure/git-bug/bridge/core"
"github.com/shurcooL/githubv4"
"github.com/MichaelMure/git-bug/bridge/core"
)
var _ Client = &githubv4.Client{}
@ -29,79 +30,69 @@ func newRateLimitHandlerClient(httpClient *http.Client) *rateLimitHandlerClient
return &rateLimitHandlerClient{sc: githubv4.NewClient(httpClient)}
}
type RateLimitingEvent struct {
msg string
}
// mutate calls the github api with a graphql mutation and for each rate limiting event it sends an
// export result.
// mutate calls the github api with a graphql mutation and sends a core.ExportResult for each rate limiting event
func (c *rateLimitHandlerClient) mutate(ctx context.Context, m interface{}, input githubv4.Input, vars map[string]interface{}, out chan<- core.ExportResult) error {
// prepare a closure for the mutation
mutFun := func(ctx context.Context) error {
return c.sc.Mutate(ctx, m, input, vars)
}
limitEvents := make(chan RateLimitingEvent)
defer close(limitEvents)
go func() {
for e := range limitEvents {
callback := func(msg string) {
select {
case <-ctx.Done():
return
case out <- core.NewExportRateLimiting(e.msg):
case out <- core.NewExportRateLimiting(msg):
}
}
}()
return c.callAPIAndRetry(mutFun, ctx, limitEvents)
return c.callAPIAndRetry(ctx, mutFun, callback)
}
// queryWithLimitEvents calls the github api with a graphql query and it sends rate limiting events
// to a given channel of type RateLimitingEvent.
func (c *rateLimitHandlerClient) queryWithLimitEvents(ctx context.Context, query interface{}, vars map[string]interface{}, limitEvents chan<- RateLimitingEvent) error {
// prepare a closure fot the query
// queryImport calls the github api with a graphql query, and sends an ImportEvent for each rate limiting event
func (c *rateLimitHandlerClient) queryImport(ctx context.Context, query interface{}, vars map[string]interface{}, importEvents chan<- ImportEvent) error {
// prepare a closure for the query
queryFun := func(ctx context.Context) error {
return c.sc.Query(ctx, query, vars)
}
return c.callAPIAndRetry(queryFun, ctx, limitEvents)
}
// queryWithImportEvents calls the github api with a graphql query and it sends rate limiting events
// to a given channel of type ImportEvent.
func (c *rateLimitHandlerClient) queryWithImportEvents(ctx context.Context, query interface{}, vars map[string]interface{}, importEvents chan<- ImportEvent) error {
// forward rate limiting events to channel of import events
limitEvents := make(chan RateLimitingEvent)
defer close(limitEvents)
go func() {
for e := range limitEvents {
callback := func(msg string) {
select {
case <-ctx.Done():
return
case importEvents <- e:
case importEvents <- RateLimitingEvent{msg}:
}
}
}()
return c.queryWithLimitEvents(ctx, query, vars, limitEvents)
return c.callAPIAndRetry(ctx, queryFun, callback)
}
// queryPrintMsgs calls the github api with a graphql query and it prints for ever rate limiting
// event a message to stdout.
func (c *rateLimitHandlerClient) queryPrintMsgs(ctx context.Context, query interface{}, vars map[string]interface{}) error {
// print rate limiting events directly to stdout.
limitEvents := make(chan RateLimitingEvent)
defer close(limitEvents)
go func() {
for e := range limitEvents {
fmt.Println(e.msg)
// queryImport calls the github api with a graphql query, and sends a core.ExportResult for each rate limiting event
func (c *rateLimitHandlerClient) queryExport(ctx context.Context, query interface{}, vars map[string]interface{}, out chan<- core.ExportResult) error {
// prepare a closure for the query
queryFun := func(ctx context.Context) error {
return c.sc.Query(ctx, query, vars)
}
}()
return c.queryWithLimitEvents(ctx, query, vars, limitEvents)
callback := func(msg string) {
select {
case <-ctx.Done():
case out <- core.NewExportRateLimiting(msg):
}
}
return c.callAPIAndRetry(ctx, queryFun, callback)
}
// queryPrintMsgs calls the github api with a graphql query, and prints a message to stdout for every rate limiting event .
func (c *rateLimitHandlerClient) queryPrintMsgs(ctx context.Context, query interface{}, vars map[string]interface{}) error {
// prepare a closure for the query
queryFun := func(ctx context.Context) error {
return c.sc.Query(ctx, query, vars)
}
callback := func(msg string) {
fmt.Println(msg)
}
return c.callAPIAndRetry(ctx, queryFun, callback)
}
// callAPIAndRetry calls the Github GraphQL API (inderectely through callAPIDealWithLimit) and in
// case of error it repeats the request to the Github API. The parameter `apiCall` is intended to be
// a closure containing a query or a mutation to the Github GraphQL API.
func (c *rateLimitHandlerClient) callAPIAndRetry(apiCall func(context.Context) error, ctx context.Context, events chan<- RateLimitingEvent) error {
func (c *rateLimitHandlerClient) callAPIAndRetry(ctx context.Context, apiCall func(context.Context) error, rateLimitEvent func(msg string)) error {
var err error
if err = c.callAPIDealWithLimit(apiCall, ctx, events); err == nil {
if err = c.callAPIDealWithLimit(ctx, apiCall, rateLimitEvent); err == nil {
return nil
}
// failure; the reason may be temporary network problems or internal errors
@ -117,7 +108,7 @@ func (c *rateLimitHandlerClient) callAPIAndRetry(apiCall func(context.Context) e
stop(timer)
return ctx.Err()
case <-timer.C:
err = c.callAPIDealWithLimit(apiCall, ctx, events)
err = c.callAPIDealWithLimit(ctx, apiCall, rateLimitEvent)
if err == nil {
return nil
}
@ -127,10 +118,10 @@ func (c *rateLimitHandlerClient) callAPIAndRetry(apiCall func(context.Context) e
}
// callAPIDealWithLimit calls the Github GraphQL API and if the Github API returns a rate limiting
// error, then it waits until the rate limit is reset and it repeats the request to the API. The
// error, then it waits until the rate limit is reset, and it repeats the request to the API. The
// parameter `apiCall` is intended to be a closure containing a query or a mutation to the Github
// GraphQL API.
func (c *rateLimitHandlerClient) callAPIDealWithLimit(apiCall func(context.Context) error, ctx context.Context, events chan<- RateLimitingEvent) error {
func (c *rateLimitHandlerClient) callAPIDealWithLimit(ctx context.Context, apiCall func(context.Context) error, rateLimitCallback func(msg string)) error {
qctx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()
// call the function fun()
@ -155,11 +146,8 @@ func (c *rateLimitHandlerClient) callAPIDealWithLimit(apiCall func(context.Conte
resetTime.String(),
)
// Send message about rate limiting event.
select {
case <-ctx.Done():
return ctx.Err()
case events <- RateLimitingEvent{msg}:
}
rateLimitCallback(msg)
// Pause current goroutine
timer := time.NewTimer(time.Until(resetTime))
select {

View File

@ -486,23 +486,10 @@ func (ge *githubExporter) cacheGithubLabels(ctx context.Context, gc *rateLimitHa
}
q := labelsQuery{}
// When performing the queries we have to forward rate limiting events to the
// current channel of export results.
events := make(chan RateLimitingEvent)
defer close(events)
go func() {
for e := range events {
select {
case <-ctx.Done():
return
case ge.out <- core.NewExportRateLimiting(e.msg):
}
}
}()
hasNextPage := true
for hasNextPage {
if err := gc.queryWithLimitEvents(ctx, &q, variables, events); err != nil {
if err := gc.queryExport(ctx, &q, variables, ge.out); err != nil {
return err
}

View File

@ -0,0 +1,40 @@
package github
import "github.com/shurcooL/githubv4"
type ImportEvent interface {
isImportEvent()
}
type RateLimitingEvent struct {
msg string
}
func (RateLimitingEvent) isImportEvent() {}
type IssueEvent struct {
issue
}
func (IssueEvent) isImportEvent() {}
type IssueEditEvent struct {
issueId githubv4.ID
userContentEdit
}
func (IssueEditEvent) isImportEvent() {}
type TimelineEvent struct {
issueId githubv4.ID
timelineItem
}
func (TimelineEvent) isImportEvent() {}
type CommentEditEvent struct {
commentId githubv4.ID
userContentEdit
}
func (CommentEditEvent) isImportEvent() {}

View File

@ -9,6 +9,7 @@ import (
const (
// These values influence how fast the github graphql rate limit is exhausted.
NumIssues = 40
NumIssueEdits = 100
NumTimelineItems = 100
@ -41,43 +42,6 @@ type importMediator struct {
err error
}
type ImportEvent interface {
isImportEvent()
}
func (RateLimitingEvent) isImportEvent() {}
type IssueEvent struct {
issue
}
func (IssueEvent) isImportEvent() {}
type IssueEditEvent struct {
issueId githubv4.ID
userContentEdit
}
func (IssueEditEvent) isImportEvent() {}
type TimelineEvent struct {
issueId githubv4.ID
timelineItem
}
func (TimelineEvent) isImportEvent() {}
type CommentEditEvent struct {
commentId githubv4.ID
userContentEdit
}
func (CommentEditEvent) isImportEvent() {}
func (mm *importMediator) NextImportEvent() ImportEvent {
return <-mm.importEvents
}
func NewImportMediator(ctx context.Context, client *rateLimitHandlerClient, owner, project string, since time.Time) *importMediator {
mm := importMediator{
gh: client,
@ -87,48 +51,24 @@ func NewImportMediator(ctx context.Context, client *rateLimitHandlerClient, owne
importEvents: make(chan ImportEvent, ChanCapacity),
err: nil,
}
go func() {
mm.fillImportEvents(ctx)
close(mm.importEvents)
}()
go mm.start(ctx)
return &mm
}
type varmap map[string]interface{}
func newIssueVars(owner, project string, since time.Time) varmap {
return varmap{
"owner": githubv4.String(owner),
"name": githubv4.String(project),
"issueSince": githubv4.DateTime{Time: since},
"issueFirst": githubv4.Int(NumIssues),
"issueEditLast": githubv4.Int(NumIssueEdits),
"issueEditBefore": (*githubv4.String)(nil),
"timelineFirst": githubv4.Int(NumTimelineItems),
"timelineAfter": (*githubv4.String)(nil),
"commentEditLast": githubv4.Int(NumCommentEdits),
"commentEditBefore": (*githubv4.String)(nil),
}
func (mm *importMediator) start(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
mm.fillImportEvents(ctx)
// Make sure we cancel everything when we are done, instead of relying on the parent context
// This should unblock pending send to the channel if the capacity was reached and avoid a panic/race when closing.
cancel()
close(mm.importEvents)
}
func newIssueEditVars() varmap {
return varmap{
"issueEditLast": githubv4.Int(NumIssueEdits),
}
}
func newTimelineVars() varmap {
return varmap{
"timelineFirst": githubv4.Int(NumTimelineItems),
"commentEditLast": githubv4.Int(NumCommentEdits),
"commentEditBefore": (*githubv4.String)(nil),
}
}
func newCommentEditVars() varmap {
return varmap{
"commentEditLast": githubv4.Int(NumCommentEdits),
}
// NextImportEvent returns the next ImportEvent, or nil if done.
func (mm *importMediator) NextImportEvent() ImportEvent {
return <-mm.importEvents
}
func (mm *importMediator) Error() error {
@ -138,7 +78,7 @@ func (mm *importMediator) Error() error {
func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) {
query := userQuery{}
vars := varmap{"login": githubv4.String(loginName)}
if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil {
if err := mm.gh.queryImport(ctx, &query, vars, mm.importEvents); err != nil {
return nil, err
}
return &query.User, nil
@ -200,7 +140,7 @@ func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID,
vars["issueEditBefore"] = cursor
}
query := issueEditQuery{}
if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil {
if err := mm.gh.queryImport(ctx, &query, vars, mm.importEvents); err != nil {
mm.err = err
return nil, false
}
@ -244,7 +184,7 @@ func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cu
vars["timelineAfter"] = cursor
}
query := timelineQuery{}
if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil {
if err := mm.gh.queryImport(ctx, &query, vars, mm.importEvents); err != nil {
mm.err = err
return nil, false
}
@ -294,7 +234,7 @@ func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID
vars["commentEditBefore"] = cursor
}
query := commentEditQuery{}
if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil {
if err := mm.gh.queryImport(ctx, &query, vars, mm.importEvents); err != nil {
mm.err = err
return nil, false
}
@ -313,7 +253,7 @@ func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String
vars["issueAfter"] = cursor
}
query := issueQuery{}
if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil {
if err := mm.gh.queryImport(ctx, &query, vars, mm.importEvents); err != nil {
mm.err = err
return nil, false
}
@ -334,3 +274,41 @@ func reverse(eds []userContentEdit) chan userContentEdit {
}()
return ret
}
// varmap is a container for Github API's pagination variables
type varmap map[string]interface{}
func newIssueVars(owner, project string, since time.Time) varmap {
return varmap{
"owner": githubv4.String(owner),
"name": githubv4.String(project),
"issueSince": githubv4.DateTime{Time: since},
"issueFirst": githubv4.Int(NumIssues),
"issueEditLast": githubv4.Int(NumIssueEdits),
"issueEditBefore": (*githubv4.String)(nil),
"timelineFirst": githubv4.Int(NumTimelineItems),
"timelineAfter": (*githubv4.String)(nil),
"commentEditLast": githubv4.Int(NumCommentEdits),
"commentEditBefore": (*githubv4.String)(nil),
}
}
func newIssueEditVars() varmap {
return varmap{
"issueEditLast": githubv4.Int(NumIssueEdits),
}
}
func newTimelineVars() varmap {
return varmap{
"timelineFirst": githubv4.Int(NumTimelineItems),
"commentEditLast": githubv4.Int(NumCommentEdits),
"commentEditBefore": (*githubv4.String)(nil),
}
}
func newCommentEditVars() varmap {
return varmap{
"commentEditLast": githubv4.Int(NumCommentEdits),
}
}