Store metadata in BigQuery (#1197)

This commit is contained in:
Azeem Shaikh 2021-10-29 20:01:25 -04:00 committed by GitHub
parent c7511206a1
commit 69f9774b93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 205 additions and 52 deletions

View File

@ -30,25 +30,13 @@ import (
"github.com/ossf/scorecard/v3/pkg"
)
func publishToRepoRequestTopic(ctx context.Context, iter data.Iterator, datetime time.Time) (int32, error) {
func publishToRepoRequestTopic(iter data.Iterator, topicPublisher pubsub.Publisher,
shardSize int, datetime time.Time) (int32, error) {
var shardNum int32
request := data.ScorecardBatchRequest{
JobTime: timestamppb.New(datetime),
ShardNum: &shardNum,
}
topic, err := config.GetRequestTopicURL()
if err != nil {
return shardNum, fmt.Errorf("error getting RequestTopicURL: %w", err)
}
topicPublisher, err := pubsub.CreatePublisher(ctx, topic)
if err != nil {
return shardNum, fmt.Errorf("error running CreatePublisher: %w", err)
}
shardSize, err := config.GetShardSize()
if err != nil {
return shardNum, fmt.Errorf("error getting ShardSize: %w", err)
}
// Create and send batch requests of repoURLs of size `ShardSize`:
// * Iterate through incoming repoURLs until `request` has len(Repos) of size `ShardSize`.
@ -59,7 +47,10 @@ func publishToRepoRequestTopic(ctx context.Context, iter data.Iterator, datetime
if err != nil {
return shardNum, fmt.Errorf("error reading repoURL: %w", err)
}
request.Repos = append(request.GetRepos(), repoURL.Repo)
request.Repos = append(request.GetRepos(), &data.Repo{
Url: &repoURL.Repo,
Metadata: repoURL.Metadata.ToString(),
})
if len(request.GetRepos()) < shardSize {
continue
}
@ -102,6 +93,20 @@ func main() {
panic(err)
}
topic, err := config.GetRequestTopicURL()
if err != nil {
panic(err)
}
topicPublisher, err := pubsub.CreatePublisher(ctx, topic)
if err != nil {
panic(err)
}
shardSize, err := config.GetShardSize()
if err != nil {
panic(err)
}
bucket, err := config.GetResultDataBucketURL()
if err != nil {
panic(err)
@ -112,7 +117,7 @@ func main() {
panic(err)
}
shardNum, err := publishToRepoRequestTopic(ctx, reader, t)
shardNum, err := publishToRepoRequestTopic(reader, topicPublisher, shardSize, t)
if err != nil {
panic(err)
}

View File

@ -34,6 +34,15 @@ func (s *CSVStrings) UnmarshalCSV(input []byte) error {
return nil
}
// ToString converts CSVStrings -> []string.
func (s CSVStrings) ToString() []string {
var ret []string
for _, i := range s {
ret = append(ret, i)
}
return ret
}
// RepoFormat is used to read input repos.
type RepoFormat struct {
Repo string `csv:"repo"`

View File

@ -20,6 +20,47 @@ import (
"github.com/google/go-cmp/cmp"
)
func TestToString(t *testing.T) {
t.Parallel()
testcases := []struct {
name string
input CSVStrings
output []string
}{
{
name: "Basic",
input: []string{"str1", "str2"},
output: []string{"str1", "str2"},
},
{
name: "NilInput",
input: nil,
output: nil,
},
{
name: "EmptyString",
input: []string{""},
output: []string{""},
},
{
name: "EmptySlice",
input: make([]string, 0),
output: nil,
},
}
for _, testcase := range testcases {
testcase := testcase
t.Run(testcase.name, func(t *testing.T) {
t.Parallel()
actual := testcase.input.ToString()
if !cmp.Equal(testcase.output, actual) {
t.Errorf("testcase failed: expected equal, got diff: %s", cmp.Diff(testcase.output, actual))
}
})
}
}
func TestUnmarshalCsv(t *testing.T) {
t.Parallel()
testcases := []struct {

View File

@ -35,12 +35,67 @@ const (
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type Repo struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Url *string `protobuf:"bytes,1,opt,name=url,proto3,oneof" json:"url,omitempty"`
Metadata []string `protobuf:"bytes,2,rep,name=metadata,proto3" json:"metadata,omitempty"`
}
func (x *Repo) Reset() {
*x = Repo{}
if protoimpl.UnsafeEnabled {
mi := &file_cron_data_request_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *Repo) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Repo) ProtoMessage() {}
func (x *Repo) ProtoReflect() protoreflect.Message {
mi := &file_cron_data_request_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Repo.ProtoReflect.Descriptor instead.
func (*Repo) Descriptor() ([]byte, []int) {
return file_cron_data_request_proto_rawDescGZIP(), []int{0}
}
func (x *Repo) GetUrl() string {
if x != nil && x.Url != nil {
return *x.Url
}
return ""
}
func (x *Repo) GetMetadata() []string {
if x != nil {
return x.Metadata
}
return nil
}
type ScorecardBatchRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Repos []string `protobuf:"bytes,1,rep,name=repos,proto3" json:"repos,omitempty"`
Repos []*Repo `protobuf:"bytes,4,rep,name=repos,proto3" json:"repos,omitempty"`
ShardNum *int32 `protobuf:"varint,2,opt,name=shard_num,json=shardNum,proto3,oneof" json:"shard_num,omitempty"`
JobTime *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=job_time,json=jobTime,proto3,oneof" json:"job_time,omitempty"`
}
@ -48,7 +103,7 @@ type ScorecardBatchRequest struct {
func (x *ScorecardBatchRequest) Reset() {
*x = ScorecardBatchRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_cron_data_request_proto_msgTypes[0]
mi := &file_cron_data_request_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -61,7 +116,7 @@ func (x *ScorecardBatchRequest) String() string {
func (*ScorecardBatchRequest) ProtoMessage() {}
func (x *ScorecardBatchRequest) ProtoReflect() protoreflect.Message {
mi := &file_cron_data_request_proto_msgTypes[0]
mi := &file_cron_data_request_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -74,10 +129,10 @@ func (x *ScorecardBatchRequest) ProtoReflect() protoreflect.Message {
// Deprecated: Use ScorecardBatchRequest.ProtoReflect.Descriptor instead.
func (*ScorecardBatchRequest) Descriptor() ([]byte, []int) {
return file_cron_data_request_proto_rawDescGZIP(), []int{0}
return file_cron_data_request_proto_rawDescGZIP(), []int{1}
}
func (x *ScorecardBatchRequest) GetRepos() []string {
func (x *ScorecardBatchRequest) GetRepos() []*Repo {
if x != nil {
return x.Repos
}
@ -106,20 +161,27 @@ var file_cron_data_request_proto_rawDesc = []byte{
0x73, 0x63, 0x6f, 0x72, 0x65, 0x63, 0x61, 0x72, 0x64, 0x2e, 0x63, 0x72, 0x6f, 0x6e, 0x2e, 0x64,
0x61, 0x74, 0x61, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x22, 0xa6, 0x01, 0x0a, 0x15, 0x53, 0x63, 0x6f, 0x72, 0x65, 0x63, 0x61,
0x72, 0x64, 0x42, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14,
0x0a, 0x05, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x72,
0x65, 0x70, 0x6f, 0x73, 0x12, 0x20, 0x0a, 0x09, 0x73, 0x68, 0x61, 0x72, 0x64, 0x5f, 0x6e, 0x75,
0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x48, 0x00, 0x52, 0x08, 0x73, 0x68, 0x61, 0x72, 0x64,
0x4e, 0x75, 0x6d, 0x88, 0x01, 0x01, 0x12, 0x3a, 0x0a, 0x08, 0x6a, 0x6f, 0x62, 0x5f, 0x74, 0x69,
0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c,
0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73,
0x74, 0x61, 0x6d, 0x70, 0x48, 0x01, 0x52, 0x07, 0x6a, 0x6f, 0x62, 0x54, 0x69, 0x6d, 0x65, 0x88,
0x01, 0x01, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x73, 0x68, 0x61, 0x72, 0x64, 0x5f, 0x6e, 0x75, 0x6d,
0x42, 0x0b, 0x0a, 0x09, 0x5f, 0x6a, 0x6f, 0x62, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x42, 0x25, 0x5a,
0x23, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6f, 0x73, 0x73, 0x66,
0x2f, 0x73, 0x63, 0x6f, 0x72, 0x65, 0x63, 0x61, 0x72, 0x64, 0x2f, 0x63, 0x72, 0x6f, 0x6e, 0x2f,
0x64, 0x61, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x72, 0x6f, 0x74, 0x6f, 0x22, 0x41, 0x0a, 0x04, 0x52, 0x65, 0x70, 0x6f, 0x12, 0x15, 0x0a, 0x03,
0x75, 0x72, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x03, 0x75, 0x72, 0x6c,
0x88, 0x01, 0x01, 0x12, 0x1a, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18,
0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x42,
0x06, 0x0a, 0x04, 0x5f, 0x75, 0x72, 0x6c, 0x22, 0xcc, 0x01, 0x0a, 0x15, 0x53, 0x63, 0x6f, 0x72,
0x65, 0x63, 0x61, 0x72, 0x64, 0x42, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x12, 0x34, 0x0a, 0x05, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b,
0x32, 0x1e, 0x2e, 0x6f, 0x73, 0x73, 0x66, 0x2e, 0x73, 0x63, 0x6f, 0x72, 0x65, 0x63, 0x61, 0x72,
0x64, 0x2e, 0x63, 0x72, 0x6f, 0x6e, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x52, 0x65, 0x70, 0x6f,
0x52, 0x05, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x12, 0x20, 0x0a, 0x09, 0x73, 0x68, 0x61, 0x72, 0x64,
0x5f, 0x6e, 0x75, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x48, 0x00, 0x52, 0x08, 0x73, 0x68,
0x61, 0x72, 0x64, 0x4e, 0x75, 0x6d, 0x88, 0x01, 0x01, 0x12, 0x3a, 0x0a, 0x08, 0x6a, 0x6f, 0x62,
0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f,
0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69,
0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x48, 0x01, 0x52, 0x07, 0x6a, 0x6f, 0x62, 0x54, 0x69,
0x6d, 0x65, 0x88, 0x01, 0x01, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x73, 0x68, 0x61, 0x72, 0x64, 0x5f,
0x6e, 0x75, 0x6d, 0x42, 0x0b, 0x0a, 0x09, 0x5f, 0x6a, 0x6f, 0x62, 0x5f, 0x74, 0x69, 0x6d, 0x65,
0x4a, 0x04, 0x08, 0x01, 0x10, 0x02, 0x42, 0x25, 0x5a, 0x23, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62,
0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6f, 0x73, 0x73, 0x66, 0x2f, 0x73, 0x63, 0x6f, 0x72, 0x65, 0x63,
0x61, 0x72, 0x64, 0x2f, 0x63, 0x72, 0x6f, 0x6e, 0x2f, 0x64, 0x61, 0x74, 0x61, 0x62, 0x06, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@ -134,18 +196,20 @@ func file_cron_data_request_proto_rawDescGZIP() []byte {
return file_cron_data_request_proto_rawDescData
}
var file_cron_data_request_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
var file_cron_data_request_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_cron_data_request_proto_goTypes = []interface{}{
(*ScorecardBatchRequest)(nil), // 0: ossf.scorecard.cron.data.ScorecardBatchRequest
(*timestamppb.Timestamp)(nil), // 1: google.protobuf.Timestamp
(*Repo)(nil), // 0: ossf.scorecard.cron.data.Repo
(*ScorecardBatchRequest)(nil), // 1: ossf.scorecard.cron.data.ScorecardBatchRequest
(*timestamppb.Timestamp)(nil), // 2: google.protobuf.Timestamp
}
var file_cron_data_request_proto_depIdxs = []int32{
1, // 0: ossf.scorecard.cron.data.ScorecardBatchRequest.job_time:type_name -> google.protobuf.Timestamp
1, // [1:1] is the sub-list for method output_type
1, // [1:1] is the sub-list for method input_type
1, // [1:1] is the sub-list for extension type_name
1, // [1:1] is the sub-list for extension extendee
0, // [0:1] is the sub-list for field type_name
0, // 0: ossf.scorecard.cron.data.ScorecardBatchRequest.repos:type_name -> ossf.scorecard.cron.data.Repo
2, // 1: ossf.scorecard.cron.data.ScorecardBatchRequest.job_time:type_name -> google.protobuf.Timestamp
2, // [2:2] is the sub-list for method output_type
2, // [2:2] is the sub-list for method input_type
2, // [2:2] is the sub-list for extension type_name
2, // [2:2] is the sub-list for extension extendee
0, // [0:2] is the sub-list for field type_name
}
func init() { file_cron_data_request_proto_init() }
@ -155,6 +219,18 @@ func file_cron_data_request_proto_init() {
}
if !protoimpl.UnsafeEnabled {
file_cron_data_request_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Repo); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_cron_data_request_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ScorecardBatchRequest); i {
case 0:
return &v.state
@ -168,13 +244,14 @@ func file_cron_data_request_proto_init() {
}
}
file_cron_data_request_proto_msgTypes[0].OneofWrappers = []interface{}{}
file_cron_data_request_proto_msgTypes[1].OneofWrappers = []interface{}{}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_cron_data_request_proto_rawDesc,
NumEnums: 0,
NumMessages: 1,
NumMessages: 2,
NumExtensions: 0,
NumServices: 0,
},

View File

@ -20,8 +20,14 @@ import "google/protobuf/timestamp.proto";
option go_package = "github.com/ossf/scorecard/cron/data";
message Repo {
optional string url = 1;
repeated string metadata = 2;
}
message ScorecardBatchRequest {
repeated string repos = 1;
repeated Repo repos = 4;
optional int32 shard_num = 2;
optional google.protobuf.Timestamp job_time = 3;
reserved 1;
}

View File

@ -70,7 +70,11 @@ func TestPublish(t *testing.T) {
topic: testcase.topic,
}
request := data.ScorecardBatchRequest{
Repos: []string{"repo1"},
Repos: []*data.Repo{
{
Url: &repo1,
},
},
}
if err := publisher.Publish(&request); err != nil {
t.Errorf("Failed to parse message: %v", err)

View File

@ -26,6 +26,8 @@ import (
"github.com/ossf/scorecard/v3/cron/data"
)
var repo1 = "repo1"
type mockReceiver struct {
msg *pubsub.Message
errOnReceive error
@ -53,7 +55,11 @@ func TestSubscriber(t *testing.T) {
{
name: "Basic",
msg: &data.ScorecardBatchRequest{
Repos: []string{"repo1"},
Repos: []*data.Repo{
{
Url: &repo1,
},
},
},
},
{
@ -65,7 +71,11 @@ func TestSubscriber(t *testing.T) {
{
name: "ShutdownFails",
msg: &data.ScorecardBatchRequest{
Repos: []string{"repo1"},
Repos: []*data.Repo{
{
Url: &repo1,
},
},
},
hasErrOnShutdown: true,
// nolint: goerr113

View File

@ -74,13 +74,14 @@ func processRequest(ctx context.Context,
var buffer bytes.Buffer
var buffer2 bytes.Buffer
// TODO: run Scorecard for each repo in a separate thread.
for _, repoURL := range batchRequest.GetRepos() {
logger.Info(fmt.Sprintf("Running Scorecard for repo: %s", repoURL))
repo, err := githubrepo.MakeGithubRepo(repoURL)
for _, repo := range batchRequest.GetRepos() {
logger.Info(fmt.Sprintf("Running Scorecard for repo: %s", *repo.Url))
repo, err := githubrepo.MakeGithubRepo(*repo.Url)
if err != nil {
logger.Warn(fmt.Sprintf("invalid GitHub URL: %v", err))
continue
}
repo.AppendMetadata(repo.Metadata()...)
result, err := pkg.RunScorecards(ctx, repo, checksToRun, repoClient)
if errors.Is(err, sce.ErrRepoUnreachable) {
// Not accessible repo - continue.