diff --git a/cron/data/summary.go b/cron/data/summary.go new file mode 100644 index 00000000..056cec8e --- /dev/null +++ b/cron/data/summary.go @@ -0,0 +1,134 @@ +// Copyright 2021 Security Scorecard Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package data + +import ( + "context" + "fmt" + "strings" + "time" + + "google.golang.org/protobuf/encoding/protojson" + + "github.com/ossf/scorecard/v4/cron/config" +) + +// ShardSummary is a summary of information about a set of shards with the same +// creation time. +type ShardSummary struct { + creationTime time.Time + shardMetadata []byte + shardsExpected int + shardsCreated int + isTransferred bool +} + +// IsCompleted checks if the percentage of completed shards is over the desired completion threshold. +// It also returns false to prevent transfers in cases where the expected number of shards is 0, +// as either the .shard_metadata file is missing, or there is nothing to transfer anyway. +func (s *ShardSummary) IsCompleted(completionThreshold float64) bool { + completedPercentage := float64(s.shardsCreated) / float64(s.shardsExpected) + return s.shardsExpected > 0 && completedPercentage >= completionThreshold +} + +// IsTransferred returns true if the shards have already been transferred. +// A true value indicates that a transfer should not occur, a false value +// indicates that a transfer should occur if IsCompleted() also returns true. +func (s *ShardSummary) IsTransferred() bool { + return s.isTransferred +} + +// Metadata returns the raw metadata about the bucket. +func (s *ShardSummary) Metadata() []byte { + return s.shardMetadata +} + +// CreationTime returns the time the shards were created. This corresponds to +// the job time generated by the controller. +func (s *ShardSummary) CreationTime() time.Time { + return s.creationTime +} + +func (s *ShardSummary) MarkTransferred(ctx context.Context, bucketURL string) error { + transferStatusFilename := GetTransferStatusFilename(s.creationTime) + if err := WriteToBlobStore(ctx, bucketURL, transferStatusFilename, nil); err != nil { + return fmt.Errorf("error during WriteToBlobStore: %w", err) + } + return nil +} + +// BucketSummary contains details about all the shards in a bucket grouped by +// their creation time. +type BucketSummary struct { + shards map[time.Time]*ShardSummary +} + +func (summary *BucketSummary) getOrCreate(t time.Time) *ShardSummary { + if summary.shards[t] == nil { + summary.shards[t] = &ShardSummary{ + creationTime: t, + } + } + return summary.shards[t] +} + +// Shards returns a slice of ShardSummary instances for each shard creation time. +func (summary *BucketSummary) Shards() []*ShardSummary { + var shards []*ShardSummary + for _, s := range summary.shards { + shards = append(shards, s) + } + return shards +} + +// GetBucketSummary iterates through all files in a bucket and returns a +// BucketSummary with details on each set of shards grouped by creation time. +func GetBucketSummary(ctx context.Context, bucketURL string) (*BucketSummary, error) { + keys, err := GetBlobKeys(ctx, bucketURL) + if err != nil { + return nil, fmt.Errorf("error getting BlobKeys: %w", err) + } + + summary := BucketSummary{ + shards: make(map[time.Time]*ShardSummary), + } + for _, key := range keys { + creationTime, filename, err := ParseBlobFilename(key) + if err != nil { + return nil, fmt.Errorf("error parsing Blob key: %w", err) + } + switch { + case strings.HasPrefix(filename, "shard-"): + summary.getOrCreate(creationTime).shardsCreated++ + case filename == config.TransferStatusFilename: + summary.getOrCreate(creationTime).isTransferred = true + case filename == config.ShardMetadataFilename: + keyData, err := GetBlobContent(ctx, bucketURL, key) + if err != nil { + return nil, fmt.Errorf("error during GetBlobContent: %w", err) + } + var metadata ShardMetadata + if err := protojson.Unmarshal(keyData, &metadata); err != nil { + return nil, fmt.Errorf("error parsing data as ShardMetadata: %w", err) + } + summary.getOrCreate(creationTime).shardsExpected = int(metadata.GetNumShard()) + summary.getOrCreate(creationTime).shardMetadata = keyData + default: + //nolint: goerr113 + return nil, fmt.Errorf("found unrecognized file: %s", key) + } + } + return &summary, nil +} diff --git a/cron/internal/bq/main_test.go b/cron/data/summary_test.go similarity index 84% rename from cron/internal/bq/main_test.go rename to cron/data/summary_test.go index ac428202..6cd2f91e 100644 --- a/cron/internal/bq/main_test.go +++ b/cron/data/summary_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package main +package data import ( "context" @@ -60,7 +60,11 @@ func TestIsCompleted(t *testing.T) { testcase := testcase t.Run(testcase.name, func(t *testing.T) { t.Parallel() - completed := isCompleted(testcase.inputExpected, testcase.inputCreated, testcase.completedThreshold) + shards := &ShardSummary{ + shardsExpected: testcase.inputExpected, + shardsCreated: testcase.inputCreated, + } + completed := shards.IsCompleted(testcase.completedThreshold) if completed != testcase.expectedCompleted { t.Errorf("test failed - expected: %t, got: %t", testcase.expectedCompleted, completed) } @@ -74,21 +78,23 @@ func TestGetBucketSummary(t *testing.T) { testcases := []struct { name string blobPath string - want *bucketSummary + want *BucketSummary wantErr bool }{ { name: "basic", - blobPath: "testdata/basic", - want: &bucketSummary{ - shards: map[time.Time]*shardSummary{ + blobPath: "testdata/summary_test/basic", + want: &BucketSummary{ + shards: map[time.Time]*ShardSummary{ time.Date(2022, 9, 19, 2, 0, 1, 0, time.UTC): { + creationTime: time.Date(2022, 9, 19, 2, 0, 1, 0, time.UTC), shardMetadata: []byte(`{"shardLoc":"test","numShard":3,"commitSha":"2231d1f722454c6c9aa6ad77377d2936803216ff"}`), shardsExpected: 3, shardsCreated: 2, isTransferred: true, }, time.Date(2022, 9, 26, 2, 0, 3, 0, time.UTC): { + creationTime: time.Date(2022, 9, 26, 2, 0, 3, 0, time.UTC), shardMetadata: []byte(`{"shardLoc":"test","numShard":5,"commitSha":"2231d1f722454c6c9aa6ad77377d2936803216ff"}`), shardsExpected: 5, shardsCreated: 3, @@ -100,7 +106,7 @@ func TestGetBucketSummary(t *testing.T) { }, { name: "invalid file present", - blobPath: "testdata/invalid", + blobPath: "testdata/summary_test/invalid", want: nil, wantErr: true, }, @@ -117,7 +123,7 @@ func TestGetBucketSummary(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - summary, err := getBucketSummary(context.Background(), "file:///"+testdataPath) + summary, err := GetBucketSummary(context.Background(), "file:///"+testdataPath) if (err != nil) != tt.wantErr { t.Errorf("getBucketSummary() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/cron/internal/bq/testdata/basic/2022.09.19/020001/.shard_metadata b/cron/data/testdata/summary_test/basic/2022.09.19/020001/.shard_metadata similarity index 100% rename from cron/internal/bq/testdata/basic/2022.09.19/020001/.shard_metadata rename to cron/data/testdata/summary_test/basic/2022.09.19/020001/.shard_metadata diff --git a/cron/internal/bq/testdata/basic/2022.09.19/020001/.transfer_complete b/cron/data/testdata/summary_test/basic/2022.09.19/020001/.transfer_complete similarity index 100% rename from cron/internal/bq/testdata/basic/2022.09.19/020001/.transfer_complete rename to cron/data/testdata/summary_test/basic/2022.09.19/020001/.transfer_complete diff --git a/cron/internal/bq/testdata/basic/2022.09.19/020001/shard-0000000 b/cron/data/testdata/summary_test/basic/2022.09.19/020001/shard-0000000 similarity index 100% rename from cron/internal/bq/testdata/basic/2022.09.19/020001/shard-0000000 rename to cron/data/testdata/summary_test/basic/2022.09.19/020001/shard-0000000 diff --git a/cron/internal/bq/testdata/basic/2022.09.19/020001/shard-0000001 b/cron/data/testdata/summary_test/basic/2022.09.19/020001/shard-0000001 similarity index 100% rename from cron/internal/bq/testdata/basic/2022.09.19/020001/shard-0000001 rename to cron/data/testdata/summary_test/basic/2022.09.19/020001/shard-0000001 diff --git a/cron/internal/bq/testdata/basic/2022.09.26/020003/.shard_metadata b/cron/data/testdata/summary_test/basic/2022.09.26/020003/.shard_metadata similarity index 100% rename from cron/internal/bq/testdata/basic/2022.09.26/020003/.shard_metadata rename to cron/data/testdata/summary_test/basic/2022.09.26/020003/.shard_metadata diff --git a/cron/internal/bq/testdata/basic/2022.09.26/020003/shard-0000000 b/cron/data/testdata/summary_test/basic/2022.09.26/020003/shard-0000000 similarity index 100% rename from cron/internal/bq/testdata/basic/2022.09.26/020003/shard-0000000 rename to cron/data/testdata/summary_test/basic/2022.09.26/020003/shard-0000000 diff --git a/cron/internal/bq/testdata/basic/2022.09.26/020003/shard-0000001 b/cron/data/testdata/summary_test/basic/2022.09.26/020003/shard-0000001 similarity index 100% rename from cron/internal/bq/testdata/basic/2022.09.26/020003/shard-0000001 rename to cron/data/testdata/summary_test/basic/2022.09.26/020003/shard-0000001 diff --git a/cron/internal/bq/testdata/basic/2022.09.26/020003/shard-1234567 b/cron/data/testdata/summary_test/basic/2022.09.26/020003/shard-1234567 similarity index 100% rename from cron/internal/bq/testdata/basic/2022.09.26/020003/shard-1234567 rename to cron/data/testdata/summary_test/basic/2022.09.26/020003/shard-1234567 diff --git a/cron/internal/bq/testdata/invalid/unknown_file b/cron/data/testdata/summary_test/invalid/unknown_file similarity index 100% rename from cron/internal/bq/testdata/invalid/unknown_file rename to cron/data/testdata/summary_test/invalid/unknown_file diff --git a/cron/internal/bq/main.go b/cron/internal/bq/main.go index 8a6401fb..e36ae726 100644 --- a/cron/internal/bq/main.go +++ b/cron/internal/bq/main.go @@ -23,106 +23,35 @@ import ( "io" "log" "net/http" - "strings" - "time" - - "google.golang.org/protobuf/encoding/protojson" "github.com/ossf/scorecard/v4/cron/config" "github.com/ossf/scorecard/v4/cron/data" ) -type shardSummary struct { - shardMetadata []byte - shardsExpected int - shardsCreated int - isTransferred bool -} - -type bucketSummary struct { - shards map[time.Time]*shardSummary -} - -func (summary *bucketSummary) getOrCreate(t time.Time) *shardSummary { - if summary.shards[t] == nil { - summary.shards[t] = new(shardSummary) - } - return summary.shards[t] -} - -// getBucketSummary iterates through all files in a bucket and -// returns `shardSummary` keyed by the shard creation time. -func getBucketSummary(ctx context.Context, bucketURL string) (*bucketSummary, error) { - keys, err := data.GetBlobKeys(ctx, bucketURL) - if err != nil { - return nil, fmt.Errorf("error getting BlobKeys: %w", err) - } - - summary := bucketSummary{ - shards: make(map[time.Time]*shardSummary), - } - for _, key := range keys { - creationTime, filename, err := data.ParseBlobFilename(key) - if err != nil { - return nil, fmt.Errorf("error parsing Blob key: %w", err) - } - switch { - case strings.HasPrefix(filename, "shard-"): - summary.getOrCreate(creationTime).shardsCreated++ - case filename == config.TransferStatusFilename: - summary.getOrCreate(creationTime).isTransferred = true - case filename == config.ShardMetadataFilename: - keyData, err := data.GetBlobContent(ctx, bucketURL, key) - if err != nil { - return nil, fmt.Errorf("error during GetBlobContent: %w", err) - } - var metadata data.ShardMetadata - if err := protojson.Unmarshal(keyData, &metadata); err != nil { - return nil, fmt.Errorf("error parsing data as ShardMetadata: %w", err) - } - summary.getOrCreate(creationTime).shardsExpected = int(metadata.GetNumShard()) - summary.getOrCreate(creationTime).shardMetadata = keyData - default: - //nolint: goerr113 - return nil, fmt.Errorf("found unrecognized file: %s", key) - } - } - return &summary, nil -} - -// isCompleted checks if the percentage of completed shards is over the desired completion threshold. -// It also returns false to prevent transfers in cases where the expected number of shards is 0, -// as either the .shard_metadata file is missing, or there is nothing to transfer anyway. -func isCompleted(expected, created int, completionThreshold float64) bool { - completedPercentage := float64(created) / float64(expected) - return expected > 0 && completedPercentage >= completionThreshold -} - func transferDataToBq(ctx context.Context, bucketURL, projectID, datasetName, tableName string, completionThreshold float64, webhookURL string, - summary *bucketSummary, + summary *data.BucketSummary, ) error { - for creationTime, shards := range summary.shards { - if shards.isTransferred || !isCompleted(shards.shardsExpected, shards.shardsCreated, completionThreshold) { + for _, shards := range summary.Shards() { + if shards.IsTransferred() || !shards.IsCompleted(completionThreshold) { continue } - shardFileURI := data.GetBlobFilename("shard-*", creationTime) + shardFileURI := data.GetBlobFilename("shard-*", shards.CreationTime()) if err := startDataTransferJob(ctx, bucketURL, shardFileURI, projectID, datasetName, tableName, - creationTime); err != nil { + shards.CreationTime()); err != nil { return fmt.Errorf("error during StartDataTransferJob: %w", err) } - transferStatusFilename := data.GetTransferStatusFilename(creationTime) - if err := data.WriteToBlobStore(ctx, bucketURL, transferStatusFilename, nil); err != nil { - return fmt.Errorf("error during WriteToBlobStore: %w", err) + if err := shards.MarkTransferred(ctx, bucketURL); err != nil { + return fmt.Errorf("error during MarkTransferred: %w", err) } if webhookURL == "" { continue } //nolint: noctx, gosec // variable URL is ok here. - resp, err := http.Post(webhookURL, "application/json", bytes.NewBuffer(shards.shardMetadata)) + resp, err := http.Post(webhookURL, "application/json", bytes.NewBuffer(shards.Metadata())) if err != nil { return fmt.Errorf("error during http.Post to %s: %w", webhookURL, err) } @@ -177,7 +106,7 @@ func main() { panic(err) } - summary, err := getBucketSummary(ctx, bucketURL) + summary, err := data.GetBucketSummary(ctx, bucketURL) if err != nil { panic(err) }