Use lease extension for PubSub worker (#533)

Co-authored-by: Azeem Shaikh <azeems@google.com>
This commit is contained in:
Azeem Shaikh 2021-06-02 17:59:42 -07:00 committed by GitHub
parent 030bc90932
commit 3b86d57217
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 223 additions and 48 deletions

View File

@ -19,10 +19,6 @@ import (
"errors"
"fmt"
"gocloud.dev/pubsub"
// Needed to link in GCP drivers.
_ "gocloud.dev/pubsub/gcppubsub"
"google.golang.org/protobuf/encoding/protojson"
"github.com/ossf/scorecard/cron/data"
@ -34,55 +30,18 @@ var ErrorInParse = errors.New("error during protojson.Unmarshal")
type Subscriber interface {
SynchronousPull() (*data.ScorecardBatchRequest, error)
Ack()
Nack()
Close() error
}
func CreateSubscriber(ctx context.Context, subscriptionURL string) (Subscriber, error) {
subscription, err := pubsub.OpenSubscription(ctx, subscriptionURL)
if err != nil {
return nil, fmt.Errorf("error during pubsub.OpenSubscription: %w", err)
}
ret := subscriberImpl{
ctx: ctx,
subscription: subscription,
}
return &ret, nil
return createGCSSubscriber(ctx, subscriptionURL)
}
type receiver interface {
Receive(ctx context.Context) (*pubsub.Message, error)
Shutdown(ctx context.Context) error
}
type subscriberImpl struct {
ctx context.Context
subscription receiver
msg *pubsub.Message
}
func (subscriber *subscriberImpl) SynchronousPull() (*data.ScorecardBatchRequest, error) {
msg, err := subscriber.subscription.Receive(subscriber.ctx)
if err != nil {
fmt.Printf("error during Receive: %v", err)
return nil, nil
}
subscriber.msg = msg
func parseJSONToRequest(jsonData []byte) (*data.ScorecardBatchRequest, error) {
ret := &data.ScorecardBatchRequest{}
if err := protojson.Unmarshal(msg.Body, ret); err != nil {
if err := protojson.Unmarshal(jsonData, ret); err != nil {
return nil, fmt.Errorf("%w: %v", ErrorInParse, err)
}
return ret, nil
}
func (subscriber *subscriberImpl) Ack() {
subscriber.msg.Ack()
}
func (subscriber *subscriberImpl) Close() error {
err := subscriber.subscription.Shutdown(subscriber.ctx)
if err != nil {
return fmt.Errorf("error during subscription.Shutdown: %w", err)
}
return nil
}

View File

@ -0,0 +1,137 @@
// Copyright 2021 Security Scorecard Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pubsub
import (
"context"
"fmt"
"log"
"strings"
"time"
pubsub "cloud.google.com/go/pubsub/apiv1"
pubsubpb "google.golang.org/genproto/googleapis/pubsub/v1"
"github.com/ossf/scorecard/cron/data"
)
const (
maxMessagesToPull = 1
ackDeadlineExtensionInSec = 600
gracePeriodInSec = 60
gcpPubsubPrefix = "gcppubsub://"
)
type gcsSubscriber struct {
ctx context.Context
done chan bool
client *pubsub.SubscriberClient
pullRequest *pubsubpb.PullRequest
subscriptionURL string
recvdAckID string
}
func createGCSSubscriber(ctx context.Context, subscriptionURL string) (*gcsSubscriber, error) {
client, err := pubsub.NewSubscriberClient(ctx)
if err != nil {
return nil, fmt.Errorf("error during NewSubscriberClient: %w", err)
}
validSubscriptionURL := strings.TrimPrefix(subscriptionURL, gcpPubsubPrefix)
ret := &gcsSubscriber{
ctx: ctx,
client: client,
subscriptionURL: validSubscriptionURL,
pullRequest: &pubsubpb.PullRequest{
Subscription: validSubscriptionURL,
MaxMessages: maxMessagesToPull,
},
}
return ret, nil
}
func (subscriber *gcsSubscriber) extendAckDeadline() {
delay := 0 * time.Second
for {
select {
case <-subscriber.ctx.Done():
return
case <-subscriber.done:
return
case <-time.After(delay):
ackDeadline := ackDeadlineExtensionInSec * time.Second
err := subscriber.client.ModifyAckDeadline(subscriber.ctx, &pubsubpb.ModifyAckDeadlineRequest{
Subscription: subscriber.subscriptionURL,
AckIds: []string{subscriber.recvdAckID},
AckDeadlineSeconds: int32(ackDeadline.Seconds()),
})
if err != nil {
log.Fatal(err)
}
delay = ackDeadline - gracePeriodInSec*time.Second
}
}
}
func (subscriber *gcsSubscriber) SynchronousPull() (*data.ScorecardBatchRequest, error) {
result, err := subscriber.client.Pull(subscriber.ctx, subscriber.pullRequest)
if err != nil {
return nil, fmt.Errorf("error during client.Pull: %w", err)
}
numReceivedMessages := len(result.ReceivedMessages)
// client.Pull returns an empty list if there are no messages available in the
// backlog.
if numReceivedMessages <= 0 {
return nil, nil
}
// Sanity check.
if numReceivedMessages > maxMessagesToPull {
log.Fatalf("expected to receive max %d messages, got %d", maxMessagesToPull, numReceivedMessages)
}
msgToProcess := result.GetReceivedMessages()[0]
subscriber.recvdAckID = msgToProcess.AckId
subscriber.done = make(chan bool)
// Continuously notify the server that processing is still happening on this message.
go subscriber.extendAckDeadline()
return parseJSONToRequest(msgToProcess.GetMessage().GetData())
}
func (subscriber *gcsSubscriber) Ack() {
err := subscriber.client.Acknowledge(subscriber.ctx, &pubsubpb.AcknowledgeRequest{
Subscription: subscriber.subscriptionURL,
AckIds: []string{subscriber.recvdAckID},
})
close(subscriber.done)
if err != nil {
log.Fatal(err)
}
}
func (subscriber *gcsSubscriber) Nack() {
// Stop extending Ack deadline.
close(subscriber.done)
}
func (subscriber *gcsSubscriber) Close() error {
close(subscriber.done)
err := subscriber.client.Close()
if err != nil {
return fmt.Errorf("error during Close: %w", err)
}
return nil
}

View File

@ -0,0 +1,77 @@
// Copyright 2021 Security Scorecard Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pubsub
import (
"context"
"fmt"
"gocloud.dev/pubsub"
// Needed to link in GCP drivers.
_ "gocloud.dev/pubsub/gcppubsub"
"github.com/ossf/scorecard/cron/data"
)
type receiver interface {
Receive(ctx context.Context) (*pubsub.Message, error)
Shutdown(ctx context.Context) error
}
type gocloudSubscriber struct {
ctx context.Context
subscription receiver
msg *pubsub.Message
}
// nolint:unused,deadcode
func createGocloudSubscriber(ctx context.Context, subscriptionURL string) (*gocloudSubscriber, error) {
subscription, err := pubsub.OpenSubscription(ctx, subscriptionURL)
if err != nil {
return nil, fmt.Errorf("error during pubsub.OpenSubscription: %w", err)
}
ret := gocloudSubscriber{
ctx: ctx,
subscription: subscription,
}
return &ret, nil
}
func (subscriber *gocloudSubscriber) SynchronousPull() (*data.ScorecardBatchRequest, error) {
msg, err := subscriber.subscription.Receive(subscriber.ctx)
if err != nil {
fmt.Printf("error during Receive: %v", err)
return nil, nil
}
subscriber.msg = msg
return parseJSONToRequest(msg.Body)
}
func (subscriber *gocloudSubscriber) Ack() {
subscriber.msg.Ack()
}
func (subscriber *gocloudSubscriber) Nack() {
subscriber.msg.Nack()
}
func (subscriber *gocloudSubscriber) Close() error {
err := subscriber.subscription.Shutdown(subscriber.ctx)
if err != nil {
return fmt.Errorf("error during subscription.Shutdown: %w", err)
}
return nil
}

View File

@ -81,7 +81,7 @@ func TestSubscriber(t *testing.T) {
if err != nil {
t.Errorf("testcase parsing failed during protojson.Marshal: %v", err)
}
subscriber := subscriberImpl{
subscriber := gocloudSubscriber{
ctx: ctx,
subscription: &mockReceiver{
msg: &pubsub.Message{Body: msgBody},

View File

@ -168,7 +168,8 @@ func main() {
break
}
if err := processRequest(ctx, req, checksToRun, bucketURL, httpClient, githubClient, graphClient); err != nil {
panic(err)
// Nack the message so that another worker can retry.
subscriber.Nack()
}
// nolint: errcheck // flushes buffer
logger.Sync()

3
go.mod
View File

@ -4,6 +4,7 @@ go 1.16
require (
cloud.google.com/go/bigquery v1.18.0
cloud.google.com/go/pubsub v1.10.3
contrib.go.opencensus.io/exporter/stackdriver v0.13.6
github.com/bradleyfalzon/ghinstallation v1.1.1
github.com/golangci/golangci-lint v1.40.1
@ -27,7 +28,7 @@ require (
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
golang.org/x/net v0.0.0-20210510120150-4163338589ed // indirect
golang.org/x/sys v0.0.0-20210514084401-e8d321eab015 // indirect
google.golang.org/genproto v0.0.0-20210513213006-bf773b8c8384 // indirect
google.golang.org/genproto v0.0.0-20210513213006-bf773b8c8384
google.golang.org/grpc v1.37.1 // indirect
google.golang.org/protobuf v1.26.0
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect