Setup PubSub framework code. (#428)

Co-authored-by: Azeem Shaikh <azeems@google.com>
This commit is contained in:
Azeem Shaikh 2021-05-14 14:32:23 -07:00 committed by GitHub
parent 670e1980d5
commit 6437c9324f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 3808 additions and 6 deletions

24
cron/config/config.go Normal file
View File

@ -0,0 +1,24 @@
// Copyright 2021 Security Scorecard Authors
//
// Licensed under the Apache License, Vershandlern 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permisshandlerns and
// limitathandlerns under the License.
package config
// TODO(Azeem Shaikh): Use config.yaml to store these values and allow users to override these values using ENV vars.
const (
ResultDataBucketURL string = "gs://ossf-scorecard-data"
RequestTopicURL = "gcppubsub://projects/openssf/topics/scorecard-batch-requests"
InputReposFile = "projects.csv"
ShardNumFilename = ".shard_num"
ShardSize int = 250
)

2883
cron/config/projects.csv Normal file

File diff suppressed because it is too large Load Diff

94
cron/controller/main.go Normal file
View File

@ -0,0 +1,94 @@
// Copyright 2021 Security Scorecard Authors
//
// Licensed under the Apache License, Vershandlern 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permisshandlerns and
// limitathandlerns under the License.
package main
import (
"context"
"fmt"
"os"
"strconv"
"time"
"github.com/ossf/scorecard/cron/config"
"github.com/ossf/scorecard/cron/data"
"github.com/ossf/scorecard/cron/pubsub"
"google.golang.org/protobuf/types/known/timestamppb"
)
func PublishToRepoRequestTopic(ctx context.Context, iter data.Iterator, datetime time.Time) (int32, error) {
var shardNum int32 = 0
request := data.ScorecardBatchRequest{
JobTime: timestamppb.New(datetime),
ShardNum: &shardNum,
}
topicPublisher, err := pubsub.CreatePublisher(ctx, config.RequestTopicURL)
if err != nil {
return shardNum, fmt.Errorf("error running CreatePublisher: %w", err)
}
// Create and send batch requests of repoURLs of size `ShardSize`:
// * Iterate through incoming repoURLs until `request` has len(Repos) of size `ShardSize`.
// * Publish request to PubSub topic.
// * Clear request.Repos and increment shardNum.
for iter.HasNext() {
repoURL, err := iter.Next()
if err != nil {
return shardNum, fmt.Errorf("error reading repoURL: %w", err)
}
request.Repos = append(request.GetRepos(), repoURL.URL())
if len(request.GetRepos()) < config.ShardSize {
continue
}
if err := topicPublisher.Publish(&request); err != nil {
return shardNum, fmt.Errorf("error running topicPublisher.Publish: %w", err)
}
request.Repos = nil
shardNum++
}
// Check if more repoURLs are pending to be sent in `request`.
if len(request.GetRepos()) > 0 {
if err := topicPublisher.Publish(&request); err != nil {
return shardNum, fmt.Errorf("error running topicPublisher.Publish: %w", err)
}
}
if err := topicPublisher.Close(); err != nil {
return shardNum, fmt.Errorf("error running topicPublisher.Close: %w", err)
}
return shardNum, nil
}
func main() {
ctx := context.Background()
t := time.Now()
reposFile, err := os.OpenFile(config.InputReposFile, os.O_RDONLY, 0o644)
if err != nil {
panic(err)
}
reader, err := data.MakeIterator(reposFile)
if err != nil {
panic(err)
}
shardNum, err := PublishToRepoRequestTopic(ctx, reader, t)
if err != nil {
panic(err)
}
err = data.WriteToBlobStore(ctx, config.ResultDataBucketURL,
data.GetShardNumFilename(t),
[]byte(strconv.Itoa(int(shardNum))))
if err != nil {
panic(err)
}
}

22
cron/data/README.md Normal file
View File

@ -0,0 +1,22 @@
# Generating proto files
## Installation
Follow instructions
[here](https://developers.google.com/protocol-buffers/docs/gotutorial#compiling-your-protocol-buffers)
to install necessary binaries.
## Compile
Use the command below to compile:
```
protoc --go_out=$DST_DIR request.proto
```
NOTE: $DST_DIR should contain `github.com/ossf/scorecard/cron/data` directory
structure.
## Future work
Update Makefile to compile and generate proto files, when we run `make all`.

59
cron/data/blob.go Normal file
View File

@ -0,0 +1,59 @@
// Copyright 2021 Security Scorecard Authors
//
// Licensed under the Apache License, Vershandlern 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permisshandlerns and
// limitathandlerns under the License.
package data
import (
"context"
"fmt"
"time"
"github.com/ossf/scorecard/cron/config"
"gocloud.dev/blob"
_ "gocloud.dev/blob/gcsblob" // Needed to link in GCP drivers.
)
const (
// filePrefixFormat uses ISO 8601 standard, i.e - YYYY-MM-DDTHH:MM:SS.
// This format guarantees that lexicographically sorted files are chronologically sorted.
filePrefixFormat = "2006.01.02/150405/"
)
func WriteToBlobStore(ctx context.Context, bucketURL, filename string, data []byte) error {
bucket, err := blob.OpenBucket(ctx, bucketURL)
if err != nil {
return fmt.Errorf("error from blob.OpenBucket: %w", err)
}
defer bucket.Close()
blobWriter, err := bucket.NewWriter(ctx, filename, nil)
if err != nil {
return fmt.Errorf("error from bucket.NewWriter: %w", err)
}
if _, err = blobWriter.Write(data); err != nil {
return fmt.Errorf("error from blobWriter.Write: %w", err)
}
if err := blobWriter.Close(); err != nil {
return fmt.Errorf("error from blobWriter.Close: %w", err)
}
return nil
}
func GetBlobFilename(filename string, datetime time.Time) string {
return datetime.Format(filePrefixFormat) + filename
}
func GetShardNumFilename(datetime time.Time) string {
return GetBlobFilename(config.ShardNumFilename, datetime)
}

55
cron/data/blob_test.go Normal file
View File

@ -0,0 +1,55 @@
// Copyright 2021 Security Scorecard Authors
//
// Licensed under the Apache License, Vershandlern 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permisshandlerns and
// limitathandlerns under the License.
package data
import (
"testing"
"time"
)
const (
inputTimeFormat string = "2006-01-02T15:04:05"
)
func TestGetBlobFilename(t *testing.T) {
t.Parallel()
testcases := []struct {
name string
inputTime string
inputFilename string
expectedFilename string
}{
{
name: "Basic",
inputTime: "2021-04-23T15:06:43",
inputFilename: "file-000",
expectedFilename: "2021.04.23/150643/file-000",
},
}
for _, testcase := range testcases {
testcase := testcase
t.Run(testcase.name, func(t *testing.T) {
t.Parallel()
datetime, err := time.Parse(inputTimeFormat, testcase.inputTime)
if err != nil {
t.Errorf("failed to parse testcase.inputTime %s: %w", testcase.inputTime, err)
}
gotFilename := GetBlobFilename(testcase.inputFilename, datetime)
if gotFilename != testcase.expectedFilename {
t.Errorf("test failed - expected: %s, got: %s", testcase.expectedFilename, gotFilename)
}
})
}
}

67
cron/data/iterator.go Normal file
View File

@ -0,0 +1,67 @@
// Copyright 2021 Security Scorecard Authors
//
// Licensed under the Apache License, Vershandlern 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permisshandlerns and
// limitathandlerns under the License.
package data
import (
"encoding/csv"
"errors"
"fmt"
"io"
"github.com/jszwec/csvutil"
"github.com/ossf/scorecard/repos"
)
type Iterator interface {
HasNext() bool
Next() (repos.RepoURL, error)
}
func MakeIterator(reader io.Reader) (Iterator, error) {
dec, err := csvutil.NewDecoder(csv.NewReader(reader))
if err != nil {
return &csvIterator{}, fmt.Errorf("error in csvutil.NewDecoder: %w", err)
}
return &csvIterator{decoder: dec}, nil
}
type inputRepo struct {
Repo string `csv:"repo"`
Metadata string `csv:"metadata"`
}
type csvIterator struct {
decoder *csvutil.Decoder
err error
next inputRepo
}
func (reader *csvIterator) HasNext() bool {
reader.err = reader.decoder.Decode(&reader.next)
return reader.err != io.EOF
}
func (reader *csvIterator) Next() (repos.RepoURL, error) {
if reader.err != nil {
return repos.RepoURL{}, reader.err
}
ret := repos.RepoURL{}
var err error
err = ret.Set(reader.next.Repo)
if err == nil {
err = ret.ValidGitHubURL()
}
return ret, errors.Unwrap(err)
}

178
cron/data/iterator_test.go Normal file
View File

@ -0,0 +1,178 @@
// Copyright 2021 Security Scorecard Authors
//
// Licensed under the Apache License, Vershandlern 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permisshandlerns and
// limitathandlerns under the License.
package data
import (
"errors"
"os"
"testing"
"github.com/ossf/scorecard/repos"
)
type outcome struct {
expectedErr error
repo repos.RepoURL
hasError bool
}
// nolint: gocognit
func TestCsvIterator(t *testing.T) {
t.Parallel()
testcases := []struct {
name string
filename string
outcomes []outcome
}{
{
name: "Basic",
filename: "testdata/basic.csv",
outcomes: []outcome{
{
hasError: false,
repo: repos.RepoURL{
Host: "github.com",
Owner: "owner1",
Repo: "repo1",
},
},
{
hasError: false,
repo: repos.RepoURL{
Host: "github.com",
Owner: "owner2",
Repo: "repo2",
},
},
{
hasError: false,
repo: repos.RepoURL{
Host: "github.com",
Owner: "owner3",
Repo: "repo3",
},
},
},
},
{
name: "FailingURLs",
filename: "testdata/failing_urls.csv",
outcomes: []outcome{
{
hasError: true,
expectedErr: repos.ErrorUnsupportedHost,
},
{
hasError: true,
expectedErr: repos.ErrorInvalidURL,
},
{
hasError: true,
expectedErr: repos.ErrorInvalidURL,
},
},
},
{
name: "EmptyRows",
filename: "testdata/empty_row.csv",
outcomes: []outcome{
{
hasError: false,
repo: repos.RepoURL{
Host: "github.com",
Owner: "owner1",
Repo: "repo1",
},
},
{
hasError: false,
repo: repos.RepoURL{
Host: "github.com",
Owner: "owner2",
Repo: "repo2",
},
},
{
hasError: false,
repo: repos.RepoURL{
Host: "github.com",
Owner: "owner3",
Repo: "repo3",
},
},
},
},
{
name: "ExtraColumns",
filename: "testdata/extra_column.csv",
outcomes: []outcome{
{
hasError: false,
repo: repos.RepoURL{
Host: "github.com",
Owner: "owner1",
Repo: "repo1",
},
},
{
hasError: false,
repo: repos.RepoURL{
Host: "github.com",
Owner: "owner2",
Repo: "repo2",
},
},
{
hasError: true,
},
},
},
}
for _, testcase := range testcases {
testcase := testcase
t.Run(testcase.name, func(t *testing.T) {
t.Parallel()
testFile, err := os.OpenFile(testcase.filename, os.O_RDONLY, 0o644)
if err != nil {
t.Errorf("failed to open %s: %v", testcase.filename, err)
}
defer testFile.Close()
testReader, err := MakeIterator(testFile)
if err != nil {
t.Errorf("failed to create reader: %v", err)
}
for _, outcome := range testcase.outcomes {
if !testReader.HasNext() {
t.Error("expected outcome, got EOF")
}
repoURL, err := testReader.Next()
if (err != nil) != outcome.hasError {
t.Errorf("expected hasError: %t, got: %v", outcome.hasError, err)
}
if !outcome.hasError && repoURL != outcome.repo {
t.Errorf("expected repoURL: %s, got %s", &outcome.repo, repoURL)
}
if outcome.hasError && outcome.expectedErr != nil && !errors.Is(err, outcome.expectedErr) {
t.Errorf("expected error: %v, got %v", outcome.expectedErr, err)
}
}
if testReader.HasNext() {
t.Error("actual reader has more repos than expected")
}
})
}
}

194
cron/data/request.pb.go Normal file
View File

@ -0,0 +1,194 @@
// Copyright 2021 Security Scorecard Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.25.0
// protoc v3.15.8
// source: cron/data/request.proto
package data
import (
proto "github.com/golang/protobuf/proto"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
// This is a compile-time assertion that a sufficiently up-to-date version
// of the legacy proto package is being used.
const _ = proto.ProtoPackageIsVersion4
type ScorecardBatchRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Repos []string `protobuf:"bytes,1,rep,name=repos,proto3" json:"repos,omitempty"`
ShardNum *int32 `protobuf:"varint,2,opt,name=shard_num,json=shardNum,proto3,oneof" json:"shard_num,omitempty"`
JobTime *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=job_time,json=jobTime,proto3,oneof" json:"job_time,omitempty"`
}
func (x *ScorecardBatchRequest) Reset() {
*x = ScorecardBatchRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_cron_data_request_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ScorecardBatchRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ScorecardBatchRequest) ProtoMessage() {}
func (x *ScorecardBatchRequest) ProtoReflect() protoreflect.Message {
mi := &file_cron_data_request_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ScorecardBatchRequest.ProtoReflect.Descriptor instead.
func (*ScorecardBatchRequest) Descriptor() ([]byte, []int) {
return file_cron_data_request_proto_rawDescGZIP(), []int{0}
}
func (x *ScorecardBatchRequest) GetRepos() []string {
if x != nil {
return x.Repos
}
return nil
}
func (x *ScorecardBatchRequest) GetShardNum() int32 {
if x != nil && x.ShardNum != nil {
return *x.ShardNum
}
return 0
}
func (x *ScorecardBatchRequest) GetJobTime() *timestamppb.Timestamp {
if x != nil {
return x.JobTime
}
return nil
}
var File_cron_data_request_proto protoreflect.FileDescriptor
var file_cron_data_request_proto_rawDesc = []byte{
0x0a, 0x17, 0x63, 0x72, 0x6f, 0x6e, 0x2f, 0x64, 0x61, 0x74, 0x61, 0x2f, 0x72, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x18, 0x6f, 0x73, 0x73, 0x66, 0x2e,
0x73, 0x63, 0x6f, 0x72, 0x65, 0x63, 0x61, 0x72, 0x64, 0x2e, 0x63, 0x72, 0x6f, 0x6e, 0x2e, 0x64,
0x61, 0x74, 0x61, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x22, 0xa6, 0x01, 0x0a, 0x15, 0x53, 0x63, 0x6f, 0x72, 0x65, 0x63, 0x61,
0x72, 0x64, 0x42, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14,
0x0a, 0x05, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x72,
0x65, 0x70, 0x6f, 0x73, 0x12, 0x20, 0x0a, 0x09, 0x73, 0x68, 0x61, 0x72, 0x64, 0x5f, 0x6e, 0x75,
0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x48, 0x00, 0x52, 0x08, 0x73, 0x68, 0x61, 0x72, 0x64,
0x4e, 0x75, 0x6d, 0x88, 0x01, 0x01, 0x12, 0x3a, 0x0a, 0x08, 0x6a, 0x6f, 0x62, 0x5f, 0x74, 0x69,
0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c,
0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73,
0x74, 0x61, 0x6d, 0x70, 0x48, 0x01, 0x52, 0x07, 0x6a, 0x6f, 0x62, 0x54, 0x69, 0x6d, 0x65, 0x88,
0x01, 0x01, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x73, 0x68, 0x61, 0x72, 0x64, 0x5f, 0x6e, 0x75, 0x6d,
0x42, 0x0b, 0x0a, 0x09, 0x5f, 0x6a, 0x6f, 0x62, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x42, 0x25, 0x5a,
0x23, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6f, 0x73, 0x73, 0x66,
0x2f, 0x73, 0x63, 0x6f, 0x72, 0x65, 0x63, 0x61, 0x72, 0x64, 0x2f, 0x63, 0x72, 0x6f, 0x6e, 0x2f,
0x64, 0x61, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_cron_data_request_proto_rawDescOnce sync.Once
file_cron_data_request_proto_rawDescData = file_cron_data_request_proto_rawDesc
)
func file_cron_data_request_proto_rawDescGZIP() []byte {
file_cron_data_request_proto_rawDescOnce.Do(func() {
file_cron_data_request_proto_rawDescData = protoimpl.X.CompressGZIP(file_cron_data_request_proto_rawDescData)
})
return file_cron_data_request_proto_rawDescData
}
var file_cron_data_request_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
var file_cron_data_request_proto_goTypes = []interface{}{
(*ScorecardBatchRequest)(nil), // 0: ossf.scorecard.cron.data.ScorecardBatchRequest
(*timestamppb.Timestamp)(nil), // 1: google.protobuf.Timestamp
}
var file_cron_data_request_proto_depIdxs = []int32{
1, // 0: ossf.scorecard.cron.data.ScorecardBatchRequest.job_time:type_name -> google.protobuf.Timestamp
1, // [1:1] is the sub-list for method output_type
1, // [1:1] is the sub-list for method input_type
1, // [1:1] is the sub-list for extension type_name
1, // [1:1] is the sub-list for extension extendee
0, // [0:1] is the sub-list for field type_name
}
func init() { file_cron_data_request_proto_init() }
func file_cron_data_request_proto_init() {
if File_cron_data_request_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_cron_data_request_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ScorecardBatchRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
file_cron_data_request_proto_msgTypes[0].OneofWrappers = []interface{}{}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_cron_data_request_proto_rawDesc,
NumEnums: 0,
NumMessages: 1,
NumExtensions: 0,
NumServices: 0,
},
GoTypes: file_cron_data_request_proto_goTypes,
DependencyIndexes: file_cron_data_request_proto_depIdxs,
MessageInfos: file_cron_data_request_proto_msgTypes,
}.Build()
File_cron_data_request_proto = out.File
file_cron_data_request_proto_rawDesc = nil
file_cron_data_request_proto_goTypes = nil
file_cron_data_request_proto_depIdxs = nil
}

27
cron/data/request.proto Normal file
View File

@ -0,0 +1,27 @@
// Copyright 2021 Security Scorecard Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package ossf.scorecard.cron.data;
import "google/protobuf/timestamp.proto";
option go_package = "github.com/ossf/scorecard/cron/data";
message ScorecardBatchRequest {
repeated string repos = 1;
optional int32 shard_num = 2;
optional google.protobuf.Timestamp job_time = 3;
}

4
cron/data/testdata/basic.csv vendored Normal file
View File

@ -0,0 +1,4 @@
repo,metadata
github.com/owner1/repo1,
github.com/owner2/repo2,
github.com/owner3/repo3,meta
1 repo metadata
2 github.com/owner1/repo1
3 github.com/owner2/repo2
4 github.com/owner3/repo3 meta

5
cron/data/testdata/empty_row.csv vendored Normal file
View File

@ -0,0 +1,5 @@
repo,metadata
github.com/owner1/repo1,
github.com/owner2/repo2,
github.com/owner3/repo3,meta
1 repo metadata
2 github.com/owner1/repo1
3 github.com/owner2/repo2
4 github.com/owner3/repo3 meta

4
cron/data/testdata/extra_column.csv vendored Normal file
View File

@ -0,0 +1,4 @@
repo,metadata
github.com/owner1/repo1,
github.com/owner2/repo2,
github.com/owner3/repo3,meta,,
1 repo,metadata
2 github.com/owner1/repo1,
3 github.com/owner2/repo2,
4 github.com/owner3/repo3,meta,,

4
cron/data/testdata/failing_urls.csv vendored Normal file
View File

@ -0,0 +1,4 @@
repo,metadata
gitlab.com/owner1/repo1,
github.com/owner2/,
github.com//repo3,meta
1 repo metadata
2 gitlab.com/owner1/repo1
3 github.com/owner2/
4 github.com//repo3 meta

89
cron/pubsub/publisher.go Normal file
View File

@ -0,0 +1,89 @@
// Copyright 2021 Security Scorecard Authors
//
// Licensed under the Apache License, Vershandlern 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permisshandlerns and
// limitathandlerns under the License.
package pubsub
import (
"context"
"errors"
"fmt"
"log"
"sync"
"sync/atomic"
"github.com/ossf/scorecard/cron/data"
"gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/gcppubsub" // Needed to link in GCP drivers.
"google.golang.org/protobuf/encoding/protojson"
)
var errorPublish = errors.New("total errors when publishing")
type Publisher interface {
Publish(request *data.ScorecardBatchRequest) error
Close() error
}
func CreatePublisher(ctx context.Context, topicURL string) (Publisher, error) {
ret := publisherImpl{}
topic, err := pubsub.OpenTopic(ctx, topicURL)
if err != nil {
return &ret, fmt.Errorf("error from pubsub.OpenTopic: %w", err)
}
return &publisherImpl{
ctx: ctx,
topic: topic,
}, nil
}
type sender interface {
Send(ctx context.Context, msg *pubsub.Message) error
}
type publisherImpl struct {
ctx context.Context
topic sender
wg sync.WaitGroup
totalErrors uint64
}
func (publisher *publisherImpl) Publish(request *data.ScorecardBatchRequest) error {
msg, err := protojson.Marshal(request)
if err != nil {
return fmt.Errorf("error from protojson.Marshal: %w", err)
}
publisher.wg.Add(1)
go func() {
defer publisher.wg.Done()
err := publisher.topic.Send(publisher.ctx, &pubsub.Message{
Body: msg,
})
if err != nil {
log.Printf("Error when publishing message %s: %v", msg, err)
atomic.AddUint64(&publisher.totalErrors, 1)
return
}
log.Print("Successfully published message")
}()
return nil
}
func (publisher *publisherImpl) Close() error {
publisher.wg.Wait()
if publisher.totalErrors > 0 {
return fmt.Errorf("%w: %d", errorPublish, publisher.totalErrors)
}
return nil
}

View File

@ -0,0 +1,86 @@
// Copyright 2021 Security Scorecard Authors
//
// Licensed under the Apache License, Vershandlern 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permisshandlerns and
// limitathandlerns under the License.
package pubsub
import (
"context"
"fmt"
"testing"
"github.com/ossf/scorecard/cron/data"
"gocloud.dev/pubsub"
)
type mockSucceedTopic struct{}
func (topic *mockSucceedTopic) Send(ctx context.Context, msg *pubsub.Message) error {
return nil
}
type mockFailTopic struct{}
func (topic *mockFailTopic) Send(ctx context.Context, msg *pubsub.Message) error {
// nolint: goerr113
return fmt.Errorf("mockFailTopic failed to send")
}
func TestPublish(t *testing.T) {
t.Parallel()
// nolint: govet
testcases := []struct {
numErrors uint64
name string
errorMsg string
hasError bool
topic sender
}{
{
name: "SendFails",
topic: &mockFailTopic{},
hasError: true,
numErrors: 1,
errorMsg: "",
},
{
name: "SendSucceeds",
topic: &mockSucceedTopic{},
hasError: false,
},
}
for _, testcase := range testcases {
testcase := testcase
t.Run(testcase.name, func(t *testing.T) {
t.Parallel()
ctx := context.Background()
publisher := publisherImpl{
ctx: ctx,
topic: testcase.topic,
}
request := data.ScorecardBatchRequest{
Repos: []string{"repo1"},
}
if err := publisher.Publish(&request); err != nil {
t.Errorf("Failed to parse message: %v", err)
}
err := publisher.Close()
if (err == nil) == testcase.hasError {
t.Errorf("Test failed. Expected: %t got: %v", testcase.hasError, err)
}
if testcase.hasError && testcase.numErrors != publisher.totalErrors {
t.Errorf("Test failed. Expected numErrors: %d, got: %d", testcase.numErrors, publisher.totalErrors)
}
})
}
}

5
go.mod
View File

@ -5,6 +5,7 @@ go 1.16
require (
github.com/aws/aws-sdk-go v1.36.30 // indirect
github.com/bradleyfalzon/ghinstallation v1.1.1
github.com/golang/protobuf v1.5.2
github.com/google/go-github/v32 v32.1.0
github.com/jszwec/csvutil v1.5.0
github.com/kr/text v0.2.0 // indirect
@ -20,10 +21,10 @@ require (
go.uber.org/zap v1.16.0
gocloud.dev v0.22.0
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83 // indirect
golang.org/x/mod v0.4.2 // indirect
golang.org/x/oauth2 v0.0.0-20201203001011-0b49973bad19
golang.org/x/sys v0.0.0-20210503173754-0981d6026fa6 // indirect
golang.org/x/tools v0.1.1-0.20210302220138-2ac05c832e1a // indirect
golang.org/x/tools v0.1.1-0.20210504170620-03ebc2c9fca8 // indirect
google.golang.org/protobuf v1.26.0
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect

14
go.sum
View File

@ -32,6 +32,7 @@ cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2k
cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw=
cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA=
cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjpRVR+TSU=
cloud.google.com/go/pubsub v1.9.0 h1:KT1LvuKJG2FMHA4HhOC/QFJ/f6i9kdNlXB4U43prxjg=
cloud.google.com/go/pubsub v1.9.0/go.mod h1:G3o6/kJvEMIEAN5urdkaP4be49WQsjNiykBIto9LFtY=
cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0ZeosJ0Rtdos=
@ -406,6 +407,7 @@ github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.3/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.opencensus.io v0.15.0/go.mod h1:UffZAU+4sDEINUGP/B7UfBBkq4fqLu9zXAX7ke6CHW0=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
@ -473,7 +475,6 @@ golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzB
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@ -515,6 +516,7 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY
golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0=
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@ -535,6 +537,8 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@ -580,7 +584,8 @@ golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20201202213521-69691e467435/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210503173754-0981d6026fa6 h1:cdsMqa2nXzqlgs183pHxtvoVwU7CyzaCTAUOg94af4c=
golang.org/x/sys v0.0.0-20210503173754-0981d6026fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@ -655,8 +660,8 @@ golang.org/x/tools v0.0.0-20201202200335-bef1c476418a/go.mod h1:emZCQorbCU4vsT4f
golang.org/x/tools v0.0.0-20201203202102-a1a1cbeaa516/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/tools v0.1.1-0.20210302220138-2ac05c832e1a h1:wcmQQeIPy0fYbQMsfxwcnzKbuBLMGaHcN0nbzHbIjdo=
golang.org/x/tools v0.1.1-0.20210302220138-2ac05c832e1a/go.mod h1:9bzcO0MWcOuT0tm1iBGzDVPshzfwoVvREIui8C+MHqU=
golang.org/x/tools v0.1.1-0.20210504170620-03ebc2c9fca8 h1:rTLms91GhM16y4sUcNGLdel0jJ8jXdQeXuN+7evgYiQ=
golang.org/x/tools v0.1.1-0.20210504170620-03ebc2c9fca8/go.mod h1:sH/Eidr0EddymY8HZSakBo32zU3fG5ovDq874hJLjVg=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@ -763,6 +768,7 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD
google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=