From 69f9774b932868e21609b16c3f755176c2d5d12e Mon Sep 17 00:00:00 2001 From: Azeem Shaikh Date: Fri, 29 Oct 2021 20:01:25 -0400 Subject: [PATCH] Store metadata in BigQuery (#1197) --- cron/controller/main.go | 37 ++++--- cron/data/format.go | 9 ++ cron/data/format_test.go | 41 ++++++++ cron/data/request.pb.go | 135 +++++++++++++++++++------ cron/data/request.proto | 8 +- cron/pubsub/publisher_test.go | 6 +- cron/pubsub/subscriber_gocloud_test.go | 14 ++- cron/worker/main.go | 7 +- 8 files changed, 205 insertions(+), 52 deletions(-) diff --git a/cron/controller/main.go b/cron/controller/main.go index d4e214cc..53265bda 100644 --- a/cron/controller/main.go +++ b/cron/controller/main.go @@ -30,25 +30,13 @@ import ( "github.com/ossf/scorecard/v3/pkg" ) -func publishToRepoRequestTopic(ctx context.Context, iter data.Iterator, datetime time.Time) (int32, error) { +func publishToRepoRequestTopic(iter data.Iterator, topicPublisher pubsub.Publisher, + shardSize int, datetime time.Time) (int32, error) { var shardNum int32 request := data.ScorecardBatchRequest{ JobTime: timestamppb.New(datetime), ShardNum: &shardNum, } - topic, err := config.GetRequestTopicURL() - if err != nil { - return shardNum, fmt.Errorf("error getting RequestTopicURL: %w", err) - } - topicPublisher, err := pubsub.CreatePublisher(ctx, topic) - if err != nil { - return shardNum, fmt.Errorf("error running CreatePublisher: %w", err) - } - - shardSize, err := config.GetShardSize() - if err != nil { - return shardNum, fmt.Errorf("error getting ShardSize: %w", err) - } // Create and send batch requests of repoURLs of size `ShardSize`: // * Iterate through incoming repoURLs until `request` has len(Repos) of size `ShardSize`. @@ -59,7 +47,10 @@ func publishToRepoRequestTopic(ctx context.Context, iter data.Iterator, datetime if err != nil { return shardNum, fmt.Errorf("error reading repoURL: %w", err) } - request.Repos = append(request.GetRepos(), repoURL.Repo) + request.Repos = append(request.GetRepos(), &data.Repo{ + Url: &repoURL.Repo, + Metadata: repoURL.Metadata.ToString(), + }) if len(request.GetRepos()) < shardSize { continue } @@ -102,6 +93,20 @@ func main() { panic(err) } + topic, err := config.GetRequestTopicURL() + if err != nil { + panic(err) + } + topicPublisher, err := pubsub.CreatePublisher(ctx, topic) + if err != nil { + panic(err) + } + + shardSize, err := config.GetShardSize() + if err != nil { + panic(err) + } + bucket, err := config.GetResultDataBucketURL() if err != nil { panic(err) @@ -112,7 +117,7 @@ func main() { panic(err) } - shardNum, err := publishToRepoRequestTopic(ctx, reader, t) + shardNum, err := publishToRepoRequestTopic(reader, topicPublisher, shardSize, t) if err != nil { panic(err) } diff --git a/cron/data/format.go b/cron/data/format.go index 2d6bb938..5500634f 100644 --- a/cron/data/format.go +++ b/cron/data/format.go @@ -34,6 +34,15 @@ func (s *CSVStrings) UnmarshalCSV(input []byte) error { return nil } +// ToString converts CSVStrings -> []string. +func (s CSVStrings) ToString() []string { + var ret []string + for _, i := range s { + ret = append(ret, i) + } + return ret +} + // RepoFormat is used to read input repos. type RepoFormat struct { Repo string `csv:"repo"` diff --git a/cron/data/format_test.go b/cron/data/format_test.go index 7e5d3494..5e7a3267 100644 --- a/cron/data/format_test.go +++ b/cron/data/format_test.go @@ -20,6 +20,47 @@ import ( "github.com/google/go-cmp/cmp" ) +func TestToString(t *testing.T) { + t.Parallel() + testcases := []struct { + name string + input CSVStrings + output []string + }{ + { + name: "Basic", + input: []string{"str1", "str2"}, + output: []string{"str1", "str2"}, + }, + { + name: "NilInput", + input: nil, + output: nil, + }, + { + name: "EmptyString", + input: []string{""}, + output: []string{""}, + }, + { + name: "EmptySlice", + input: make([]string, 0), + output: nil, + }, + } + + for _, testcase := range testcases { + testcase := testcase + t.Run(testcase.name, func(t *testing.T) { + t.Parallel() + actual := testcase.input.ToString() + if !cmp.Equal(testcase.output, actual) { + t.Errorf("testcase failed: expected equal, got diff: %s", cmp.Diff(testcase.output, actual)) + } + }) + } +} + func TestUnmarshalCsv(t *testing.T) { t.Parallel() testcases := []struct { diff --git a/cron/data/request.pb.go b/cron/data/request.pb.go index 044a7631..47ac968f 100644 --- a/cron/data/request.pb.go +++ b/cron/data/request.pb.go @@ -35,12 +35,67 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +type Repo struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Url *string `protobuf:"bytes,1,opt,name=url,proto3,oneof" json:"url,omitempty"` + Metadata []string `protobuf:"bytes,2,rep,name=metadata,proto3" json:"metadata,omitempty"` +} + +func (x *Repo) Reset() { + *x = Repo{} + if protoimpl.UnsafeEnabled { + mi := &file_cron_data_request_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Repo) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Repo) ProtoMessage() {} + +func (x *Repo) ProtoReflect() protoreflect.Message { + mi := &file_cron_data_request_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Repo.ProtoReflect.Descriptor instead. +func (*Repo) Descriptor() ([]byte, []int) { + return file_cron_data_request_proto_rawDescGZIP(), []int{0} +} + +func (x *Repo) GetUrl() string { + if x != nil && x.Url != nil { + return *x.Url + } + return "" +} + +func (x *Repo) GetMetadata() []string { + if x != nil { + return x.Metadata + } + return nil +} + type ScorecardBatchRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Repos []string `protobuf:"bytes,1,rep,name=repos,proto3" json:"repos,omitempty"` + Repos []*Repo `protobuf:"bytes,4,rep,name=repos,proto3" json:"repos,omitempty"` ShardNum *int32 `protobuf:"varint,2,opt,name=shard_num,json=shardNum,proto3,oneof" json:"shard_num,omitempty"` JobTime *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=job_time,json=jobTime,proto3,oneof" json:"job_time,omitempty"` } @@ -48,7 +103,7 @@ type ScorecardBatchRequest struct { func (x *ScorecardBatchRequest) Reset() { *x = ScorecardBatchRequest{} if protoimpl.UnsafeEnabled { - mi := &file_cron_data_request_proto_msgTypes[0] + mi := &file_cron_data_request_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -61,7 +116,7 @@ func (x *ScorecardBatchRequest) String() string { func (*ScorecardBatchRequest) ProtoMessage() {} func (x *ScorecardBatchRequest) ProtoReflect() protoreflect.Message { - mi := &file_cron_data_request_proto_msgTypes[0] + mi := &file_cron_data_request_proto_msgTypes[1] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -74,10 +129,10 @@ func (x *ScorecardBatchRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use ScorecardBatchRequest.ProtoReflect.Descriptor instead. func (*ScorecardBatchRequest) Descriptor() ([]byte, []int) { - return file_cron_data_request_proto_rawDescGZIP(), []int{0} + return file_cron_data_request_proto_rawDescGZIP(), []int{1} } -func (x *ScorecardBatchRequest) GetRepos() []string { +func (x *ScorecardBatchRequest) GetRepos() []*Repo { if x != nil { return x.Repos } @@ -106,20 +161,27 @@ var file_cron_data_request_proto_rawDesc = []byte{ 0x73, 0x63, 0x6f, 0x72, 0x65, 0x63, 0x61, 0x72, 0x64, 0x2e, 0x63, 0x72, 0x6f, 0x6e, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xa6, 0x01, 0x0a, 0x15, 0x53, 0x63, 0x6f, 0x72, 0x65, 0x63, 0x61, - 0x72, 0x64, 0x42, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, - 0x0a, 0x05, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x72, - 0x65, 0x70, 0x6f, 0x73, 0x12, 0x20, 0x0a, 0x09, 0x73, 0x68, 0x61, 0x72, 0x64, 0x5f, 0x6e, 0x75, - 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x48, 0x00, 0x52, 0x08, 0x73, 0x68, 0x61, 0x72, 0x64, - 0x4e, 0x75, 0x6d, 0x88, 0x01, 0x01, 0x12, 0x3a, 0x0a, 0x08, 0x6a, 0x6f, 0x62, 0x5f, 0x74, 0x69, - 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, - 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, - 0x74, 0x61, 0x6d, 0x70, 0x48, 0x01, 0x52, 0x07, 0x6a, 0x6f, 0x62, 0x54, 0x69, 0x6d, 0x65, 0x88, - 0x01, 0x01, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x73, 0x68, 0x61, 0x72, 0x64, 0x5f, 0x6e, 0x75, 0x6d, - 0x42, 0x0b, 0x0a, 0x09, 0x5f, 0x6a, 0x6f, 0x62, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x42, 0x25, 0x5a, - 0x23, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6f, 0x73, 0x73, 0x66, - 0x2f, 0x73, 0x63, 0x6f, 0x72, 0x65, 0x63, 0x61, 0x72, 0x64, 0x2f, 0x63, 0x72, 0x6f, 0x6e, 0x2f, - 0x64, 0x61, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x41, 0x0a, 0x04, 0x52, 0x65, 0x70, 0x6f, 0x12, 0x15, 0x0a, 0x03, + 0x75, 0x72, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x03, 0x75, 0x72, 0x6c, + 0x88, 0x01, 0x01, 0x12, 0x1a, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, + 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x42, + 0x06, 0x0a, 0x04, 0x5f, 0x75, 0x72, 0x6c, 0x22, 0xcc, 0x01, 0x0a, 0x15, 0x53, 0x63, 0x6f, 0x72, + 0x65, 0x63, 0x61, 0x72, 0x64, 0x42, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x34, 0x0a, 0x05, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, + 0x32, 0x1e, 0x2e, 0x6f, 0x73, 0x73, 0x66, 0x2e, 0x73, 0x63, 0x6f, 0x72, 0x65, 0x63, 0x61, 0x72, + 0x64, 0x2e, 0x63, 0x72, 0x6f, 0x6e, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x52, 0x65, 0x70, 0x6f, + 0x52, 0x05, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x12, 0x20, 0x0a, 0x09, 0x73, 0x68, 0x61, 0x72, 0x64, + 0x5f, 0x6e, 0x75, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x48, 0x00, 0x52, 0x08, 0x73, 0x68, + 0x61, 0x72, 0x64, 0x4e, 0x75, 0x6d, 0x88, 0x01, 0x01, 0x12, 0x3a, 0x0a, 0x08, 0x6a, 0x6f, 0x62, + 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, + 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, + 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x48, 0x01, 0x52, 0x07, 0x6a, 0x6f, 0x62, 0x54, 0x69, + 0x6d, 0x65, 0x88, 0x01, 0x01, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x73, 0x68, 0x61, 0x72, 0x64, 0x5f, + 0x6e, 0x75, 0x6d, 0x42, 0x0b, 0x0a, 0x09, 0x5f, 0x6a, 0x6f, 0x62, 0x5f, 0x74, 0x69, 0x6d, 0x65, + 0x4a, 0x04, 0x08, 0x01, 0x10, 0x02, 0x42, 0x25, 0x5a, 0x23, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, + 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6f, 0x73, 0x73, 0x66, 0x2f, 0x73, 0x63, 0x6f, 0x72, 0x65, 0x63, + 0x61, 0x72, 0x64, 0x2f, 0x63, 0x72, 0x6f, 0x6e, 0x2f, 0x64, 0x61, 0x74, 0x61, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -134,18 +196,20 @@ func file_cron_data_request_proto_rawDescGZIP() []byte { return file_cron_data_request_proto_rawDescData } -var file_cron_data_request_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_cron_data_request_proto_msgTypes = make([]protoimpl.MessageInfo, 2) var file_cron_data_request_proto_goTypes = []interface{}{ - (*ScorecardBatchRequest)(nil), // 0: ossf.scorecard.cron.data.ScorecardBatchRequest - (*timestamppb.Timestamp)(nil), // 1: google.protobuf.Timestamp + (*Repo)(nil), // 0: ossf.scorecard.cron.data.Repo + (*ScorecardBatchRequest)(nil), // 1: ossf.scorecard.cron.data.ScorecardBatchRequest + (*timestamppb.Timestamp)(nil), // 2: google.protobuf.Timestamp } var file_cron_data_request_proto_depIdxs = []int32{ - 1, // 0: ossf.scorecard.cron.data.ScorecardBatchRequest.job_time:type_name -> google.protobuf.Timestamp - 1, // [1:1] is the sub-list for method output_type - 1, // [1:1] is the sub-list for method input_type - 1, // [1:1] is the sub-list for extension type_name - 1, // [1:1] is the sub-list for extension extendee - 0, // [0:1] is the sub-list for field type_name + 0, // 0: ossf.scorecard.cron.data.ScorecardBatchRequest.repos:type_name -> ossf.scorecard.cron.data.Repo + 2, // 1: ossf.scorecard.cron.data.ScorecardBatchRequest.job_time:type_name -> google.protobuf.Timestamp + 2, // [2:2] is the sub-list for method output_type + 2, // [2:2] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name } func init() { file_cron_data_request_proto_init() } @@ -155,6 +219,18 @@ func file_cron_data_request_proto_init() { } if !protoimpl.UnsafeEnabled { file_cron_data_request_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Repo); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_cron_data_request_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*ScorecardBatchRequest); i { case 0: return &v.state @@ -168,13 +244,14 @@ func file_cron_data_request_proto_init() { } } file_cron_data_request_proto_msgTypes[0].OneofWrappers = []interface{}{} + file_cron_data_request_proto_msgTypes[1].OneofWrappers = []interface{}{} type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_cron_data_request_proto_rawDesc, NumEnums: 0, - NumMessages: 1, + NumMessages: 2, NumExtensions: 0, NumServices: 0, }, diff --git a/cron/data/request.proto b/cron/data/request.proto index 68117cb8..dab6d3e9 100644 --- a/cron/data/request.proto +++ b/cron/data/request.proto @@ -20,8 +20,14 @@ import "google/protobuf/timestamp.proto"; option go_package = "github.com/ossf/scorecard/cron/data"; +message Repo { + optional string url = 1; + repeated string metadata = 2; +} + message ScorecardBatchRequest { - repeated string repos = 1; + repeated Repo repos = 4; optional int32 shard_num = 2; optional google.protobuf.Timestamp job_time = 3; + reserved 1; } diff --git a/cron/pubsub/publisher_test.go b/cron/pubsub/publisher_test.go index 86e41074..1a5997dc 100644 --- a/cron/pubsub/publisher_test.go +++ b/cron/pubsub/publisher_test.go @@ -70,7 +70,11 @@ func TestPublish(t *testing.T) { topic: testcase.topic, } request := data.ScorecardBatchRequest{ - Repos: []string{"repo1"}, + Repos: []*data.Repo{ + { + Url: &repo1, + }, + }, } if err := publisher.Publish(&request); err != nil { t.Errorf("Failed to parse message: %v", err) diff --git a/cron/pubsub/subscriber_gocloud_test.go b/cron/pubsub/subscriber_gocloud_test.go index c9441806..8ba4e228 100644 --- a/cron/pubsub/subscriber_gocloud_test.go +++ b/cron/pubsub/subscriber_gocloud_test.go @@ -26,6 +26,8 @@ import ( "github.com/ossf/scorecard/v3/cron/data" ) +var repo1 = "repo1" + type mockReceiver struct { msg *pubsub.Message errOnReceive error @@ -53,7 +55,11 @@ func TestSubscriber(t *testing.T) { { name: "Basic", msg: &data.ScorecardBatchRequest{ - Repos: []string{"repo1"}, + Repos: []*data.Repo{ + { + Url: &repo1, + }, + }, }, }, { @@ -65,7 +71,11 @@ func TestSubscriber(t *testing.T) { { name: "ShutdownFails", msg: &data.ScorecardBatchRequest{ - Repos: []string{"repo1"}, + Repos: []*data.Repo{ + { + Url: &repo1, + }, + }, }, hasErrOnShutdown: true, // nolint: goerr113 diff --git a/cron/worker/main.go b/cron/worker/main.go index df5d5fac..73e45c40 100644 --- a/cron/worker/main.go +++ b/cron/worker/main.go @@ -74,13 +74,14 @@ func processRequest(ctx context.Context, var buffer bytes.Buffer var buffer2 bytes.Buffer // TODO: run Scorecard for each repo in a separate thread. - for _, repoURL := range batchRequest.GetRepos() { - logger.Info(fmt.Sprintf("Running Scorecard for repo: %s", repoURL)) - repo, err := githubrepo.MakeGithubRepo(repoURL) + for _, repo := range batchRequest.GetRepos() { + logger.Info(fmt.Sprintf("Running Scorecard for repo: %s", *repo.Url)) + repo, err := githubrepo.MakeGithubRepo(*repo.Url) if err != nil { logger.Warn(fmt.Sprintf("invalid GitHub URL: %v", err)) continue } + repo.AppendMetadata(repo.Metadata()...) result, err := pkg.RunScorecards(ctx, repo, checksToRun, repoClient) if errors.Is(err, sce.ErrRepoUnreachable) { // Not accessible repo - continue.