Bug fixes and additional logs for PubSub worker (#507)

Co-authored-by: Azeem Shaikh <azeems@google.com>
This commit is contained in:
Azeem Shaikh 2021-05-25 15:12:47 -07:00 committed by GitHub
parent 638a5f80f8
commit 34fccb60b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -17,11 +17,10 @@ package main
import (
"bytes"
"context"
"errors"
"fmt"
"log"
"net/http"
"os"
"sync"
"github.com/google/go-github/v32/github"
"github.com/shurcooL/githubv4"
@ -39,10 +38,6 @@ import (
func processRequest(ctx context.Context,
batchRequest *data.ScorecardBatchRequest, bucketURL string,
httpClient *http.Client, githubClient *github.Client, graphClient *githubv4.Client) error {
repoURLs := make([]repos.RepoURL, 0, len(batchRequest.GetRepos()))
var resultCh chan repos.RepoResult
var buffer bytes.Buffer
checksToRun := checks.AllChecks
// nolint
// FIXME :- deleting branch-protection
@ -51,6 +46,7 @@ func processRequest(ctx context.Context,
// This will reduce usage of the API.
delete(checksToRun, "Branch-Protection")
repoURLs := make([]repos.RepoURL, 0, len(batchRequest.GetRepos()))
for _, repo := range batchRequest.GetRepos() {
repoURL := repos.RepoURL{}
if err := repoURL.Set(repo); err != nil {
@ -61,20 +57,13 @@ func processRequest(ctx context.Context,
}
repoURLs = append(repoURLs, repoURL)
}
go func() {
var wg sync.WaitGroup
for _, repoURL := range repoURLs {
repoURL := repoURL
wg.Add(1)
go func() {
resultCh <- pkg.RunScorecards(ctx, repoURL, checksToRun, httpClient, githubClient, graphClient)
wg.Done()
}()
}
wg.Wait()
}()
for result := range resultCh {
var buffer bytes.Buffer
// TODO: run Scorecard for each repo in a separate thread.
for _, repoURL := range repoURLs {
log.Printf("Running Scorecard for repo: %s", repoURL.URL())
result := pkg.RunScorecards(ctx, repoURL, checksToRun, httpClient, githubClient, graphClient)
result.Date = batchRequest.GetJobTime().AsTime().Format("2006-01-02")
err := result.AsJSON(true /*showDetails*/, &buffer)
if err != nil {
return fmt.Errorf("error during result.AsJSON: %w", err)
@ -84,11 +73,15 @@ func processRequest(ctx context.Context,
filename := data.GetBlobFilename(
fmt.Sprintf("shard-%05d", batchRequest.GetShardNum()),
batchRequest.GetJobTime().AsTime())
return errors.Unwrap(data.WriteToBlobStore(ctx, bucketURL, filename, buffer.Bytes()))
if err := data.WriteToBlobStore(ctx, bucketURL, filename, buffer.Bytes()); err != nil {
return fmt.Errorf("error during WriteToBlobStore: %w", err)
}
log.Printf("Write to shard file successful: %s", filename)
return nil
}
func createNetClients(ctx context.Context) (httpClient *http.Client,
githubClient *github.Client, graphClient *githubv4.Client) {
func createNetClients(ctx context.Context) (
httpClient *http.Client, githubClient *github.Client, graphClient *githubv4.Client, logger *zap.Logger) {
cfg := zap.NewProductionConfig()
cfg.Level.SetLevel(zap.InfoLevel)
logger, err := cfg.Build()
@ -133,20 +126,24 @@ func main() {
// nolint: goerr113
panic(fmt.Errorf("env_vars %s must be set", roundtripper.BucketURL))
}
httpClient, githubClient, graphClient := createNetClients(ctx)
httpClient, githubClient, graphClient, logger := createNetClients(ctx)
for {
req, err := subscriber.SynchronousPull()
if err != nil {
panic(err)
}
log.Print("Received message from subscription")
if req == nil {
fmt.Printf("subscription returned nil message during Receive, exiting")
log.Print("subscription returned nil message during Receive, exiting")
break
}
if err := processRequest(ctx, req, bucketURL, httpClient, githubClient, graphClient); err != nil {
panic(err)
}
// nolint: errcheck // flushes buffer
logger.Sync()
subscriber.Ack()
}
err = subscriber.Close()