mirror of
https://github.com/neilotoole/sq.git
synced 2024-11-28 12:33:44 +03:00
got rid of dead notify package (#116)
This commit is contained in:
parent
540adfac58
commit
85b8879324
@ -549,7 +549,6 @@ type writers struct {
|
|||||||
recordw output.RecordWriter
|
recordw output.RecordWriter
|
||||||
metaw output.MetadataWriter
|
metaw output.MetadataWriter
|
||||||
srcw output.SourceWriter
|
srcw output.SourceWriter
|
||||||
notifyw output.NotificationWriter
|
|
||||||
errw output.ErrorWriter
|
errw output.ErrorWriter
|
||||||
pingw output.PingWriter
|
pingw output.PingWriter
|
||||||
}
|
}
|
||||||
@ -572,7 +571,6 @@ func newWriters(log lg.Log, cmd *cobra.Command, defaults config.Defaults, out, e
|
|||||||
metaw: tablew.NewMetadataWriter(out2, fm),
|
metaw: tablew.NewMetadataWriter(out2, fm),
|
||||||
srcw: tablew.NewSourceWriter(out2, fm),
|
srcw: tablew.NewSourceWriter(out2, fm),
|
||||||
pingw: tablew.NewPingWriter(out2, fm),
|
pingw: tablew.NewPingWriter(out2, fm),
|
||||||
notifyw: tablew.NewNotifyWriter(out2, fm),
|
|
||||||
errw: tablew.NewErrorWriter(errOut2, fm),
|
errw: tablew.NewErrorWriter(errOut2, fm),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,34 +0,0 @@
|
|||||||
package tablew
|
|
||||||
|
|
||||||
import (
|
|
||||||
"io"
|
|
||||||
|
|
||||||
"github.com/neilotoole/sq/cli/output"
|
|
||||||
"github.com/neilotoole/sq/libsq/notify"
|
|
||||||
)
|
|
||||||
|
|
||||||
type notifyWriter struct {
|
|
||||||
tbl *table
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewNotifyWriter implements output.NotificationWriter.
|
|
||||||
func NewNotifyWriter(out io.Writer, fm *output.Formatting) output.NotificationWriter {
|
|
||||||
tbl := &table{out: out, header: fm.ShowHeader, fm: fm }
|
|
||||||
w := ¬ifyWriter{tbl: tbl}
|
|
||||||
w.tbl.reset()
|
|
||||||
return w
|
|
||||||
}
|
|
||||||
|
|
||||||
// NotifyDestinations implements output.NotificationWriter.
|
|
||||||
func (w *notifyWriter) NotifyDestinations(dests []notify.Destination) error {
|
|
||||||
w.tbl.tblImpl.SetHeader([]string{"NOTIFIER", "TYPE", "TARGET"})
|
|
||||||
var rows [][]string
|
|
||||||
|
|
||||||
for _, dest := range dests {
|
|
||||||
row := []string{dest.Label, string(dest.Type), dest.Target}
|
|
||||||
rows = append(rows, row)
|
|
||||||
}
|
|
||||||
|
|
||||||
w.tbl.appendRowsAndRenderAll(rows)
|
|
||||||
return nil
|
|
||||||
}
|
|
@ -11,7 +11,6 @@ import (
|
|||||||
|
|
||||||
"github.com/neilotoole/sq/libsq/core/sqlz"
|
"github.com/neilotoole/sq/libsq/core/sqlz"
|
||||||
"github.com/neilotoole/sq/libsq/driver"
|
"github.com/neilotoole/sq/libsq/driver"
|
||||||
"github.com/neilotoole/sq/libsq/notify"
|
|
||||||
"github.com/neilotoole/sq/libsq/source"
|
"github.com/neilotoole/sq/libsq/source"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -62,12 +61,6 @@ type SourceWriter interface {
|
|||||||
Source(src *source.Source) error
|
Source(src *source.Source) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// NotificationWriter outputs notification destination details.
|
|
||||||
type NotificationWriter interface {
|
|
||||||
// NotifyDestinations outputs details of the notification
|
|
||||||
// destinations.
|
|
||||||
NotifyDestinations(dests []notify.Destination) error
|
|
||||||
}
|
|
||||||
|
|
||||||
// ErrorWriter outputs errors.
|
// ErrorWriter outputs errors.
|
||||||
type ErrorWriter interface {
|
type ErrorWriter interface {
|
||||||
|
@ -1,69 +0,0 @@
|
|||||||
package notify
|
|
||||||
|
|
||||||
import (
|
|
||||||
"time"
|
|
||||||
|
|
||||||
uuid "github.com/satori/go.uuid"
|
|
||||||
|
|
||||||
"github.com/neilotoole/sq/libsq/core/stringz"
|
|
||||||
)
|
|
||||||
|
|
||||||
// State is the job state, one of Created, Running, Completed or Failed.
|
|
||||||
type State string
|
|
||||||
|
|
||||||
// Possible values of State.
|
|
||||||
const (
|
|
||||||
Created = State("CREATED")
|
|
||||||
Running = State("RUNNING")
|
|
||||||
Completed = State("COMPLETED")
|
|
||||||
Canceled = State("CANCELED")
|
|
||||||
Failed = State("FAILED")
|
|
||||||
)
|
|
||||||
|
|
||||||
// Job represents a single libsq engine workflow instance.
|
|
||||||
type Job struct {
|
|
||||||
ID string `yaml:"id" json:"id"`
|
|
||||||
Started *time.Time `yaml:"started,omitempty" json:"started,omitempty"`
|
|
||||||
Ended *time.Time `yaml:"ended,omitempty" json:"ended,omitempty"`
|
|
||||||
|
|
||||||
// Stmt is the SLQ/SQL statement/query this job is executing.
|
|
||||||
Stmt string `yaml:"stmt" json:"stmt"`
|
|
||||||
State State `yaml:"state" json:"state"`
|
|
||||||
Errors []error `yaml:"errors,omitempty" json:"errors,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// New returns a new Job, with a generated ID, and State set to Created.
|
|
||||||
// The Started and Ended fields are both nil.
|
|
||||||
func New(stmt string) *Job {
|
|
||||||
return &Job{ID: uuid.NewV4().String(), State: Created, Stmt: stmt}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (j *Job) String() string {
|
|
||||||
return stringz.SprintJSON(j)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start sets the job.Started timestamp, and job.State to Running.
|
|
||||||
func (j *Job) Start() *Job {
|
|
||||||
now := time.Now()
|
|
||||||
j.Started = &now
|
|
||||||
j.State = Running
|
|
||||||
return j
|
|
||||||
}
|
|
||||||
|
|
||||||
// Complete sets the job.Ended timestamp, and job.State to Completed.
|
|
||||||
func (j *Job) Complete() *Job {
|
|
||||||
now := time.Now()
|
|
||||||
j.Ended = &now
|
|
||||||
j.State = Completed
|
|
||||||
return j
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fail sets the job.Ended timestamp, job.State to Failed,
|
|
||||||
// and adds the provided errors to job.Errors
|
|
||||||
func (j *Job) Fail(errs ...error) *Job {
|
|
||||||
now := time.Now()
|
|
||||||
j.Ended = &now
|
|
||||||
j.State = Failed
|
|
||||||
j.Errors = append(j.Errors, errs...)
|
|
||||||
return j
|
|
||||||
}
|
|
@ -1,147 +0,0 @@
|
|||||||
// Package notify is an experiment for sending notifications.
|
|
||||||
package notify
|
|
||||||
|
|
||||||
import (
|
|
||||||
"regexp"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/neilotoole/lg"
|
|
||||||
|
|
||||||
"github.com/neilotoole/sq/libsq/core/errz"
|
|
||||||
"github.com/neilotoole/sq/libsq/core/stringz"
|
|
||||||
)
|
|
||||||
|
|
||||||
// DestType is the destination type, e.g. "slack", "hipchat", or "email" etc.
|
|
||||||
type DestType string
|
|
||||||
|
|
||||||
// Destination is a destination for messages.
|
|
||||||
type Destination struct {
|
|
||||||
Type DestType `yaml:"type" json:"type"`
|
|
||||||
Label string `yaml:"label" json:"label"`
|
|
||||||
Target string `yaml:"target" json:"target"`
|
|
||||||
Credentials string `yaml:"credentials" json:"credentials"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d Destination) String() string {
|
|
||||||
return stringz.SprintJSON(d)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Message is a notification message, optionally containing a Job that the message is associated with.
|
|
||||||
type Message struct {
|
|
||||||
Text string `yaml:"text" json:"text"`
|
|
||||||
Job *Job `yaml:"job,empty" json:"job,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewJobMessage creates a Message indicating the state of the job.
|
|
||||||
func NewJobMessage(jb Job) Message {
|
|
||||||
m := Message{Job: &jb}
|
|
||||||
return m
|
|
||||||
}
|
|
||||||
|
|
||||||
// Notifier is an interface that can send notification messages.
|
|
||||||
type Notifier interface {
|
|
||||||
// Send sends the message.
|
|
||||||
Send(msg Message) error
|
|
||||||
}
|
|
||||||
|
|
||||||
// Provider is a factory that returns Notifier instances and generates notification Destinations from user parameters.
|
|
||||||
type Provider interface {
|
|
||||||
// Destination returns a notification Destination instance from the supplied parameters.
|
|
||||||
Destination(typ DestType, target string, label string, credentials string,
|
|
||||||
labelAvailable func(label string) bool) (*Destination, error)
|
|
||||||
// Notifier returns a Notifier instance for the given destination.
|
|
||||||
Notifier(dest Destination) (Notifier, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
var providers = make(map[DestType]Provider)
|
|
||||||
|
|
||||||
// RegisterProvider should be invoked by notification implementations to
|
|
||||||
// indicate that they handle a specific destination type.
|
|
||||||
func RegisterProvider(typ DestType, p Provider) {
|
|
||||||
providers[typ] = p
|
|
||||||
}
|
|
||||||
|
|
||||||
// ProviderFor returns a Provider for the specified destination type.
|
|
||||||
func ProviderFor(typ DestType) (Provider, error) {
|
|
||||||
p, ok := providers[typ]
|
|
||||||
if !ok {
|
|
||||||
return nil, errz.Errorf("unsupported notification destination type %q", typ)
|
|
||||||
}
|
|
||||||
return p, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewAsyncNotifier returns a Notifier that sends messages asynchronously to the supplied destination.
|
|
||||||
// The invoking code should call AsyncNotifier.Wait() before exiting.
|
|
||||||
// TODO: Should take a context.Context param.
|
|
||||||
func NewAsyncNotifier(log lg.Log, dests []Destination) (*AsyncNotifier, error) {
|
|
||||||
notifiers := make([]Notifier, len(dests))
|
|
||||||
|
|
||||||
for i, dest := range dests {
|
|
||||||
provider, ok := providers[dest.Type]
|
|
||||||
if !ok {
|
|
||||||
return nil, errz.Errorf("no provider for notification destination type %q", dest.Type)
|
|
||||||
}
|
|
||||||
|
|
||||||
notifier, err := provider.Notifier(dest)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
notifiers[i] = notifier
|
|
||||||
}
|
|
||||||
|
|
||||||
return &AsyncNotifier{log: log, dests: notifiers, wg: &sync.WaitGroup{}, done: make(chan struct{})}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// AsyncNotifier is a Notifier that wraps a bunch of other
|
|
||||||
// notifiers and sends message asynchronously. The invoking code
|
|
||||||
// should call AsyncNotifier.Wait() before exiting.
|
|
||||||
type AsyncNotifier struct {
|
|
||||||
log lg.Log
|
|
||||||
dests []Notifier
|
|
||||||
done chan struct{}
|
|
||||||
wg *sync.WaitGroup
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *AsyncNotifier) Send(msg Message) error {
|
|
||||||
a.wg.Add(len(a.dests))
|
|
||||||
|
|
||||||
for _, dest := range a.dests {
|
|
||||||
dest := dest
|
|
||||||
go func() {
|
|
||||||
defer a.wg.Done()
|
|
||||||
err := dest.Send(msg)
|
|
||||||
if err != nil {
|
|
||||||
a.log.Warnf("problem sending notification: %v", err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *AsyncNotifier) Wait(timeout time.Duration) {
|
|
||||||
go func() {
|
|
||||||
a.wg.Wait()
|
|
||||||
close(a.done)
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-a.done:
|
|
||||||
case <-time.After(timeout):
|
|
||||||
a.log.Warnf("hit timeout before all notifiers completed")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var handlePattern = regexp.MustCompile(`\A[a-zA-Z][a-zA-Z0-9_]*$`)
|
|
||||||
|
|
||||||
// ValidHandle returns an error if handle is not an acceptable notification destination handle value.
|
|
||||||
func ValidHandle(handle string) error {
|
|
||||||
const msg = `invalid notification destination handle value %q: must begin with a letter, followed by zero or more letters, digits, or underscores, e.g. "slack_devops"` //nolint:lll
|
|
||||||
|
|
||||||
if !handlePattern.MatchString(handle) {
|
|
||||||
return errz.Errorf(msg, handle)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user