Introduce sub command register under stolonctl to auto register stolon for service discovery

Co-authored-by: Dinesh B <dineshudt17@gmail.com>
Co-authored-by: Abdul Rahman K <kadkab.abdul@gmail.com>
This commit is contained in:
Krishnaswamy Subramanian 2018-08-04 18:24:59 +05:30
parent 25c0fce06f
commit 4730d0ae7f
No known key found for this signature in database
GPG Key ID: 4A6253752F4C9C3D
18 changed files with 1980 additions and 2 deletions

2
.gitignore vendored
View File

@ -14,3 +14,5 @@ Session.vim
#
bin/
/release/
.idea

View File

@ -0,0 +1,169 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: ./cmd/stolonctl/cmd/register/discovery.go
// Package mock_register is a generated GoMock package.
package mock_register
import (
gomock "github.com/golang/mock/gomock"
api "github.com/hashicorp/consul/api"
register "github.com/sorintlab/stolon/cmd/stolonctl/cmd/register"
reflect "reflect"
)
// MockServiceDiscovery is a mock of ServiceDiscovery interface
type MockServiceDiscovery struct {
ctrl *gomock.Controller
recorder *MockServiceDiscoveryMockRecorder
}
// MockServiceDiscoveryMockRecorder is the mock recorder for MockServiceDiscovery
type MockServiceDiscoveryMockRecorder struct {
mock *MockServiceDiscovery
}
// NewMockServiceDiscovery creates a new mock instance
func NewMockServiceDiscovery(ctrl *gomock.Controller) *MockServiceDiscovery {
mock := &MockServiceDiscovery{ctrl: ctrl}
mock.recorder = &MockServiceDiscoveryMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *MockServiceDiscovery) EXPECT() *MockServiceDiscoveryMockRecorder {
return m.recorder
}
// Register mocks base method
func (m *MockServiceDiscovery) Register(info *register.ServiceInfo) error {
ret := m.ctrl.Call(m, "Register", info)
ret0, _ := ret[0].(error)
return ret0
}
// Register indicates an expected call of Register
func (mr *MockServiceDiscoveryMockRecorder) Register(info interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Register", reflect.TypeOf((*MockServiceDiscovery)(nil).Register), info)
}
// Services mocks base method
func (m *MockServiceDiscovery) Services(name string) (register.ServiceInfos, error) {
ret := m.ctrl.Call(m, "Services", name)
ret0, _ := ret[0].(register.ServiceInfos)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// Services indicates an expected call of Services
func (mr *MockServiceDiscoveryMockRecorder) Services(name interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Services", reflect.TypeOf((*MockServiceDiscovery)(nil).Services), name)
}
// DeRegister mocks base method
func (m *MockServiceDiscovery) DeRegister(info *register.ServiceInfo) error {
ret := m.ctrl.Call(m, "DeRegister", info)
ret0, _ := ret[0].(error)
return ret0
}
// DeRegister indicates an expected call of DeRegister
func (mr *MockServiceDiscoveryMockRecorder) DeRegister(info interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeRegister", reflect.TypeOf((*MockServiceDiscovery)(nil).DeRegister), info)
}
// MockConsulAgent is a mock of ConsulAgent interface
type MockConsulAgent struct {
ctrl *gomock.Controller
recorder *MockConsulAgentMockRecorder
}
// MockConsulAgentMockRecorder is the mock recorder for MockConsulAgent
type MockConsulAgentMockRecorder struct {
mock *MockConsulAgent
}
// NewMockConsulAgent creates a new mock instance
func NewMockConsulAgent(ctrl *gomock.Controller) *MockConsulAgent {
mock := &MockConsulAgent{ctrl: ctrl}
mock.recorder = &MockConsulAgentMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *MockConsulAgent) EXPECT() *MockConsulAgentMockRecorder {
return m.recorder
}
// ServiceRegister mocks base method
func (m *MockConsulAgent) ServiceRegister(service *api.AgentServiceRegistration) error {
ret := m.ctrl.Call(m, "ServiceRegister", service)
ret0, _ := ret[0].(error)
return ret0
}
// ServiceRegister indicates an expected call of ServiceRegister
func (mr *MockConsulAgentMockRecorder) ServiceRegister(service interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ServiceRegister", reflect.TypeOf((*MockConsulAgent)(nil).ServiceRegister), service)
}
// ServiceDeregister mocks base method
func (m *MockConsulAgent) ServiceDeregister(serviceID string) error {
ret := m.ctrl.Call(m, "ServiceDeregister", serviceID)
ret0, _ := ret[0].(error)
return ret0
}
// ServiceDeregister indicates an expected call of ServiceDeregister
func (mr *MockConsulAgentMockRecorder) ServiceDeregister(serviceID interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ServiceDeregister", reflect.TypeOf((*MockConsulAgent)(nil).ServiceDeregister), serviceID)
}
// Services mocks base method
func (m *MockConsulAgent) Services() (map[string]*api.AgentService, error) {
ret := m.ctrl.Call(m, "Services")
ret0, _ := ret[0].(map[string]*api.AgentService)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// Services indicates an expected call of Services
func (mr *MockConsulAgentMockRecorder) Services() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Services", reflect.TypeOf((*MockConsulAgent)(nil).Services))
}
// MockConsulCatalog is a mock of ConsulCatalog interface
type MockConsulCatalog struct {
ctrl *gomock.Controller
recorder *MockConsulCatalogMockRecorder
}
// MockConsulCatalogMockRecorder is the mock recorder for MockConsulCatalog
type MockConsulCatalogMockRecorder struct {
mock *MockConsulCatalog
}
// NewMockConsulCatalog creates a new mock instance
func NewMockConsulCatalog(ctrl *gomock.Controller) *MockConsulCatalog {
mock := &MockConsulCatalog{ctrl: ctrl}
mock.recorder = &MockConsulCatalogMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *MockConsulCatalog) EXPECT() *MockConsulCatalogMockRecorder {
return m.recorder
}
// Service mocks base method
func (m *MockConsulCatalog) Service(service, tag string, q *api.QueryOptions) ([]*api.CatalogService, *api.QueryMeta, error) {
ret := m.ctrl.Call(m, "Service", service, tag, q)
ret0, _ := ret[0].([]*api.CatalogService)
ret1, _ := ret[1].(*api.QueryMeta)
ret2, _ := ret[2].(error)
return ret0, ret1, ret2
}
// Service indicates an expected call of Service
func (mr *MockConsulCatalogMockRecorder) Service(service, tag, q interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Service", reflect.TypeOf((*MockConsulCatalog)(nil).Service), service, tag, q)
}

View File

@ -0,0 +1,197 @@
// Copyright 2019 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.
package cmd
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
"github.com/sorintlab/stolon/cmd"
"github.com/sorintlab/stolon/cmd/stolonctl/cmd/register"
slog "github.com/sorintlab/stolon/internal/log"
"github.com/sorintlab/stolon/internal/store"
"github.com/spf13/cobra"
"go.uber.org/zap"
)
//Register command to register stolon master and slave for service discovery
var Register = &cobra.Command{
Use: "register",
Short: "Register stolon keepers for service discovery",
Run: runRegister,
Version: cmd.Version,
}
var rCfg register.Config
var log = slog.S()
func init() {
Register.PersistentFlags().StringVar(&rCfg.Backend, "register-backend", "consul", "register backend type (consul)")
Register.PersistentFlags().StringVar(&rCfg.Endpoints, "register-endpoints", "http://127.0.0.1:8500", "a comma-delimited list of register endpoints (use https scheme for tls communication) defaults: http://127.0.0.1:8500 for consul")
Register.PersistentFlags().StringVar(&rCfg.TLSCertFile, "register-cert-file", "", "certificate file for client identification to the register")
Register.PersistentFlags().StringVar(&rCfg.TLSKeyFile, "register-key", "", "private key file for client identification to the register")
Register.PersistentFlags().BoolVar(&rCfg.TLSInsecureSkipVerify, "register-skip-tls-verify", false, "skip register certificate verification (insecure!!!)")
Register.PersistentFlags().StringVar(&rCfg.TLSCAFile, "register-ca-file", "", "verify certificates of HTTPS-enabled register servers using this CA bundle")
Register.PersistentFlags().BoolVar(&rCfg.RegisterMaster, "register-master", false, "register master as well for service discovery (use it with caution!!!)")
Register.PersistentFlags().StringVar(&rCfg.TagMasterAs, "tag-master-as", "master", "a comma-delimited list of tag to be used when registering master")
Register.PersistentFlags().StringVar(&rCfg.TagSlaveAs, "tag-slave-as", "slave", "a comma-delimited list of tag to be used when registering slave")
Register.PersistentFlags().BoolVar(&cfg.Debug, "debug", false, "enable debug logging")
Register.PersistentFlags().IntVar(&rCfg.SleepInterval, "sleep-interval", 10, "number of seconds to sleep before probing for change")
CmdStolonCtl.AddCommand(Register)
}
func sleepInterval() time.Duration {
return time.Duration(rCfg.SleepInterval) * time.Second
}
func checkConfig(cfg *config, rCfg *register.Config) error {
if err := cmd.CheckCommonConfig(&cfg.CommonConfig); err != nil {
return err
}
return rCfg.Validate()
}
func runRegister(c *cobra.Command, _ []string) {
switch cfg.LogLevel {
case "error":
slog.SetLevel(zap.ErrorLevel)
case "warn":
slog.SetLevel(zap.WarnLevel)
case "info":
slog.SetLevel(zap.InfoLevel)
case "debug":
slog.SetLevel(zap.DebugLevel)
default:
die("invalid log level: %v", cfg.LogLevel)
}
if cfg.Debug {
slog.SetDebug()
}
if cmd.IsColorLoggerEnable(c, &cfg.CommonConfig) {
log = slog.SColor()
}
if err := checkConfig(&cfg, &rCfg); err != nil {
die(err.Error())
}
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
if err := registerCluster(sigs, &cfg, &rCfg); err != nil {
die(err.Error())
}
}
func registerCluster(sigs chan os.Signal, cfg *config, rCfg *register.Config) error {
s, err := cmd.NewStore(&cfg.CommonConfig)
if err != nil {
return err
}
endCh := make(chan struct{})
timerCh := time.NewTimer(0).C
service, err := register.NewServiceDiscovery(rCfg)
if err != nil {
return err
}
for {
select {
case <-sigs:
return nil
case <-timerCh:
go func() {
checkAndRegisterMasterAndSlaves(cfg.ClusterName, s, service, rCfg.RegisterMaster)
endCh <- struct{}{}
}()
case <-endCh:
timerCh = time.NewTimer(sleepInterval()).C
}
}
}
func checkAndRegisterMasterAndSlaves(clusterName string, store store.Store, discovery register.ServiceDiscovery, registerMaster bool) {
discoveredServices, err := discovery.Services(clusterName)
if err != nil {
log.Errorf("unable to get info about existing services: %v", err)
return
}
existingServices, err := getExistingServices(clusterName, store, registerMaster)
if err == nil {
log.Debugf("found services %v", existingServices)
} else {
log.Errorf("%s skipping", err.Error())
return
}
diff := existingServices.Diff(discoveredServices)
for _, removed := range diff.Removed {
deRegisterService(discovery, &removed)
}
for _, added := range diff.Added {
registerService(discovery, &added)
}
}
func getExistingServices(clusterName string, store store.Store, includeMaster bool) (register.ServiceInfos, error) {
cluster, err := register.NewCluster(clusterName, rCfg, store)
if err != nil {
return nil, fmt.Errorf("cannot get cluster data: %v", err)
}
result := register.ServiceInfos{}
infos, err := cluster.ServiceInfos()
if err != nil {
return nil, fmt.Errorf("cannot get service infos: %v", err)
}
for uid, info := range infos {
if !includeMaster && info.IsMaster {
log.Infof("skipping registering master")
continue
}
result[uid] = info
}
return result, nil
}
func registerService(service register.ServiceDiscovery, serviceInfo *register.ServiceInfo) {
if serviceInfo == nil {
return
}
if err := service.Register(serviceInfo); err != nil {
log.Errorf("unable to register %s with uid %s as %v, reason: %s", serviceInfo.Name, serviceInfo.ID, serviceInfo.Tags, err.Error())
} else {
log.Infof("successfully registered %s with uid %s as %v", serviceInfo.Name, serviceInfo.ID, serviceInfo.Tags)
}
}
func deRegisterService(service register.ServiceDiscovery, serviceInfo *register.ServiceInfo) {
if serviceInfo == nil {
return
}
if err := service.DeRegister(serviceInfo); err != nil {
log.Errorf("unable to deregister %s with uid %s as %v, reason: %s", serviceInfo.Name, serviceInfo.ID, serviceInfo.Tags, err.Error())
} else {
log.Infof("successfully deregistered %s with uid %s as %v", serviceInfo.Name, serviceInfo.ID, serviceInfo.Tags)
}
}

View File

@ -0,0 +1,71 @@
// Copyright 2019 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.
package register
import (
"context"
"errors"
"github.com/sorintlab/stolon/internal/cluster"
"github.com/sorintlab/stolon/internal/store"
)
// Cluster type exposes necessary methods to find master and slave
// from underlying store
type Cluster struct {
name string
cd *cluster.ClusterData
tagMasterAs Tags
tagSlaveAs Tags
}
// NewCluster returns an new instance of Cluster
func NewCluster(name string, rCfg Config, store store.Store) (*Cluster, error) {
cd, _, err := store.GetClusterData(context.TODO())
if err != nil {
return nil, err
} else if cd == nil {
return nil, errors.New("no cluster data available")
}
return &Cluster{name: name, cd: cd, tagMasterAs: NewTags(rCfg.TagMasterAs), tagSlaveAs: NewTags(rCfg.TagSlaveAs)}, nil
}
// ServiceInfos returns all the service information from the cluster data in underlying store
func (c *Cluster) ServiceInfos() (ServiceInfos, error) {
if c.cd.Cluster == nil {
return nil, errors.New("cluster data not available")
}
serviceInfos := ServiceInfos{}
master := c.cd.Cluster.Status.Master
for uid, db := range c.cd.DBs {
if db.Status.Healthy {
tags := c.tagSlaveAs
isMaster := false
if uid == master {
tags = c.tagMasterAs
isMaster = true
}
info, err := NewServiceInfo(c.name, db, tags, isMaster)
if err != nil {
return nil, err
}
serviceInfos[uid] = *info
}
}
return serviceInfos, nil
}

View File

@ -0,0 +1,152 @@
// Copyright 2019 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.
package register
import (
"errors"
"testing"
"github.com/golang/mock/gomock"
"github.com/sorintlab/stolon/internal/cluster"
"github.com/sorintlab/stolon/internal/mock/store"
)
func TestNewCluster(t *testing.T) {
t.Run("should return error returned by store when getting cluster data", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockStore := mock_store.NewMockStore(ctrl)
mockStore.EXPECT().GetClusterData(gomock.Any()).Return(nil, nil, errors.New("unable to fetch cluster data"))
_, err := NewCluster("test", Config{}, mockStore)
if err == nil || err.Error() != "unable to fetch cluster data" {
t.Errorf("expected unable to fetch cluster data error")
}
})
t.Run("should return error returned by if store returns nil as cluster data", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockStore := mock_store.NewMockStore(ctrl)
mockStore.EXPECT().GetClusterData(gomock.Any()).Return(nil, nil, nil)
_, err := NewCluster("test", Config{}, mockStore)
if err == nil || err.Error() != "no cluster data available" {
t.Errorf("expected no cluster data available error")
}
})
t.Run("should create new cluster", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
cd := &cluster.ClusterData{}
mockStore := mock_store.NewMockStore(ctrl)
mockStore.EXPECT().GetClusterData(gomock.Any()).Return(cd, nil, nil)
expected := Cluster{name: "test", cd: cd}
actual, err := NewCluster("test", Config{}, mockStore)
if expected.name != actual.name {
t.Errorf("expected name to be %s but got %s", expected.name, actual.name)
} else if expected.cd != actual.cd {
t.Errorf("expected cluster data to be %v but got %v", expected.cd, actual.cd)
}
if err != nil {
t.Errorf("haven't expected error when fetching cluster data")
}
})
}
func TestServiceInfos(t *testing.T) {
t.Run("should return error if cluster data not available", func(t *testing.T) {
cl := Cluster{cd: &cluster.ClusterData{}, name: "test"}
_, err := cl.ServiceInfos()
if err == nil || err.Error() != "cluster data not available" {
t.Errorf("expected cluster data not available error")
}
})
t.Run("should get all the healthy service infos form the cluster data", func(t *testing.T) {
master := &cluster.DB{UID: "master", Status: cluster.DBStatus{Healthy: true, ListenAddress: "127.0.0.1", Port: "5432"}}
slave := &cluster.DB{UID: "slave1", Status: cluster.DBStatus{Healthy: true, ListenAddress: "127.0.0.1", Port: "5433"}}
anotherSlave := &cluster.DB{UID: "slave2", Status: cluster.DBStatus{Healthy: false, ListenAddress: "127.0.0.1", Port: "5433"}}
cl := Cluster{
cd: &cluster.ClusterData{
DBs: map[string]*cluster.DB{"master": master, "slave1": slave, "slave2": anotherSlave},
Cluster: &cluster.Cluster{
Status: cluster.ClusterStatus{Master: "master"},
},
},
name: "test",
tagMasterAs: Tags{"master"},
tagSlaveAs: Tags{"slave"},
}
infos, err := cl.ServiceInfos()
if err != nil {
t.Error("expected no error")
}
if len(infos) != 2 {
t.Errorf("expected to have 2 service infos but got %d", len(infos))
}
expectedMasterInfo := ServiceInfo{
Name: "test",
Address: "127.0.0.1",
Port: 5432,
ID: "master",
Tags: Tags{"master"},
IsMaster: true,
}
expectedSlaveInfo := ServiceInfo{
Name: "test",
Address: "127.0.0.1",
Port: 5433,
ID: "slave1",
Tags: Tags{"slave"},
IsMaster: false,
}
actualMaster := infos["master"]
actualSlave := infos["slave1"]
if !expectedMasterInfo.Compare(actualMaster) {
t.Errorf("expected master to be %v but was %v", expectedMasterInfo, actualMaster)
}
if !expectedMasterInfo.IsMaster {
t.Errorf("expected isMaster to be %v but was %v", true, expectedMasterInfo.IsMaster)
}
if !expectedSlaveInfo.Compare(actualSlave) {
t.Errorf("expected slave to be %v but was %v", expectedSlaveInfo, actualSlave)
}
if expectedSlaveInfo.IsMaster {
t.Errorf("expected isMaster to be %v but was %v", false, expectedMasterInfo.IsMaster)
}
})
}

View File

@ -0,0 +1,77 @@
// Copyright 2019 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
package register
import (
"fmt"
"net/url"
"strings"
"github.com/hashicorp/consul/api"
)
// Config represents necessary configurations which can passed
// for registering master and slave info for service discovery
type Config struct {
Backend string
Endpoints string
Token string
TLSAddress string
TLSCAFile string
TLSCAPath string
TLSCertFile string
TLSKeyFile string
TLSInsecureSkipVerify bool
SleepInterval int
TagMasterAs string
TagSlaveAs string
RegisterMaster bool
}
// Validate returns nil if the config is valid, else returns error with
// appropriate reason
func (config *Config) Validate() error {
switch config.Backend {
case "consul":
addresses := strings.Split(config.Endpoints, ",")
if len(addresses) != 1 {
return fmt.Errorf("consul does not support multiple endpoints: %s", config.Endpoints)
}
_, err := url.Parse(config.Endpoints)
return err
default:
return fmt.Errorf("unknown register backend: %q", config.Backend)
}
}
// ConsulConfig returns consul.api.ConsulConfig if register endpoint is valid consul url
// else will return error with appropriate reason
func (config *Config) ConsulConfig() (*api.Config, error) {
url, err := url.Parse(config.Endpoints)
if err != nil {
return nil, err
}
return &api.Config{
Address: url.Host,
Scheme: url.Scheme,
TLSConfig: api.TLSConfig{
Address: config.TLSAddress,
CAFile: config.TLSCAFile,
CAPath: config.TLSCAPath,
CertFile: config.TLSCertFile,
KeyFile: config.TLSKeyFile,
InsecureSkipVerify: config.TLSInsecureSkipVerify,
},
}, nil
}

View File

@ -0,0 +1,99 @@
// Copyright 2019 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
package register
import "testing"
func TestRegisterConfig(t *testing.T) {
t.Run("validate", func(t *testing.T) {
t.Run("should check for consul register backend", func(t *testing.T) {
config := Config{Backend: "something other than consul"}
err := config.Validate()
if err == nil || err.Error() != "unknown register backend: \"something other than consul\"" {
t.Errorf("expected unknown register backend but got %s", err.Error())
}
})
t.Run("should not return any error if all valid configurations are specified", func(t *testing.T) {
config := Config{Backend: "consul"}
err := config.Validate()
if err != nil {
t.Errorf("expected no error but got '%v'", err.Error())
}
})
t.Run("should not support multiple addresses", func(t *testing.T) {
config := Config{Backend: "consul", Endpoints: "http://127.0.0.1:8500,http://127.0.0.2:8500"}
err := config.Validate()
if err == nil || err.Error() != "consul does not support multiple endpoints: http://127.0.0.1:8500,http://127.0.0.2:8500" {
t.Errorf("expected unknown register backend but got %s", err.Error())
}
})
})
t.Run("config", func(t *testing.T) {
t.Run("should return config", func(t *testing.T) {
c := Config{
Backend: "consul",
Endpoints: "https://127.0.0.1:8500",
TLSAddress: "address",
TLSCAFile: "ca-file",
TLSCAPath: "ca-path",
TLSCertFile: "cert-file",
TLSKeyFile: "key-file",
TLSInsecureSkipVerify: true,
}
config, err := c.ConsulConfig()
if err != nil {
t.Errorf("expected error to be nil but got %s", err.Error())
}
if config.Address != "127.0.0.1:8500" {
t.Errorf("expected address to be %s but got %s", c.Endpoints, config.Address)
}
if config.Scheme != "https" {
t.Errorf("expected address to be https but got %s", config.Scheme)
}
if config.TLSConfig.Address != "address" {
t.Errorf("expected tls address to be address but got %s", config.TLSConfig.Address)
}
if config.TLSConfig.CAFile != "ca-file" {
t.Errorf("expected tlsCaFile to be ca-file but got %s", config.TLSConfig.CAFile)
}
if config.TLSConfig.CAPath != "ca-path" {
t.Errorf("expected tlsCaPath to be ca-path but got %s", config.TLSConfig.CAPath)
}
if config.TLSConfig.CertFile != "cert-file" {
t.Errorf("expected tlsCertFile to be cert-file but got %s", config.TLSConfig.CertFile)
}
if config.TLSConfig.KeyFile != "key-file" {
t.Errorf("expected tlsKeyFile to be key-file but got %s", config.TLSConfig.KeyFile)
}
if config.TLSConfig.InsecureSkipVerify != true {
t.Errorf("expected tlsInsecureSkipVerify to be true but got %v", config.TLSConfig.InsecureSkipVerify)
}
})
})
}

View File

@ -0,0 +1,93 @@
// Copyright 2019 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.
package register
import (
"errors"
"github.com/hashicorp/consul/api"
)
// ServiceDiscovery helps to register service
type ServiceDiscovery interface {
Register(info *ServiceInfo) error
Services(name string) (ServiceInfos, error)
DeRegister(info *ServiceInfo) error
}
// NewServiceDiscovery creates a Discovery from registerBackend and registerEndpoints
func NewServiceDiscovery(config *Config) (ServiceDiscovery, error) {
switch config.Backend {
case "consul":
if apiConfig, err := config.ConsulConfig(); err != nil {
return nil, err
} else if client, err := api.NewClient(apiConfig); err != nil {
return nil, err
} else {
agent := client.Agent()
catalog := client.Catalog()
return NewConsulServiceDiscovery(agent, catalog), nil
}
default:
return nil, errors.New("register backend not supported")
}
}
// ConsulServiceDiscovery helps to register service to consul
type ConsulServiceDiscovery struct {
agent ConsulAgent
catalog ConsulCatalog
}
// ConsulAgent interface holds all the necessary methods to interact with consul agent
type ConsulAgent interface {
ServiceRegister(service *api.AgentServiceRegistration) error
ServiceDeregister(serviceID string) error
}
type ConsulCatalog interface {
Service(service, tag string, q *api.QueryOptions) ([]*api.CatalogService, *api.QueryMeta, error)
}
// NewConsulServiceDiscovery creates a new ConsulDiscovery
func NewConsulServiceDiscovery(agent ConsulAgent, catalog ConsulCatalog) ServiceDiscovery {
return &ConsulServiceDiscovery{agent: agent, catalog: catalog}
}
// Register registers the given service info to consul
func (cd *ConsulServiceDiscovery) Register(info *ServiceInfo) error {
return cd.agent.ServiceRegister(info.ConsulAgentServiceRegistration())
}
// DeRegister de-registers the given service info to consul
func (cd *ConsulServiceDiscovery) DeRegister(info *ServiceInfo) error {
return cd.agent.ServiceDeregister(info.ID)
}
// Services returns the services registered in the consul
func (cd *ConsulServiceDiscovery) Services(name string) (ServiceInfos, error) {
services, _, err := cd.catalog.Service(name, "", nil)
if err != nil {
return nil, err
}
result := ServiceInfos{}
for _, service := range services {
if service != nil {
consulService := NewServiceInfoFromConsulService(*service)
result[service.ServiceID] = consulService
}
}
return result, nil
}

View File

@ -0,0 +1,131 @@
// Copyright 2019 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.
package register_test
import (
"errors"
"testing"
"github.com/golang/mock/gomock"
"github.com/hashicorp/consul/api"
"github.com/sorintlab/stolon/cmd/stolonctl/cmd/internal/mock/register"
"github.com/sorintlab/stolon/cmd/stolonctl/cmd/register"
)
func TestNewServiceDiscovery(t *testing.T) {
t.Run("should return consul service discovery", func(t *testing.T) {
config := register.Config{Backend: "consul", Endpoints: "http://127.0.0.1"}
sd, err := register.NewServiceDiscovery(&config)
if err != nil {
t.Errorf("expected error to be nil but was %s", err.Error())
} else if sd == nil {
t.Errorf("expected service discovery not to be nil")
}
})
}
func TestConsulServiceDiscovery(t *testing.T) {
t.Run("should be able to register service info", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
client := mock_register.NewMockConsulAgent(ctrl)
catalog := mock_register.NewMockConsulCatalog(ctrl)
serviceDiscovery := register.NewConsulServiceDiscovery(client, catalog)
expectedServiceInfo := register.ServiceInfo{
Name: "service",
ID: "1",
Port: 5432,
Address: "127.0.0.1",
Tags: []string{"tag"},
Check: register.HealthCheck{
Interval: "10s",
TCP: "tcp",
},
}
client.EXPECT().ServiceRegister(expectedServiceInfo.ConsulAgentServiceRegistration())
err := serviceDiscovery.Register(&expectedServiceInfo)
if err != nil {
t.Errorf("expected error to be nil when registering service but got %s", err.Error())
}
})
t.Run("should return error returned by http client", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
client := mock_register.NewMockConsulAgent(ctrl)
catalog := mock_register.NewMockConsulCatalog(ctrl)
serviceDiscovery := register.NewConsulServiceDiscovery(client, catalog)
client.EXPECT().ServiceRegister(gomock.Any()).Return(errors.New("something went wrong"))
err := serviceDiscovery.Register(&register.ServiceInfo{})
if err == nil || err.Error() != "something went wrong" {
t.Errorf("expected error to be something went wrong")
}
})
}
func TestServices(t *testing.T) {
t.Run("should return valid service infos", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
client := mock_register.NewMockConsulAgent(ctrl)
catalog := mock_register.NewMockConsulCatalog(ctrl)
serviceDiscovery := register.NewConsulServiceDiscovery(client, catalog)
services := []*api.CatalogService{
{ServiceID: "masterUID", ServiceName: "test"},
{ServiceID: "slaveUID", ServiceName: "test"},
nil,
}
catalog.EXPECT().Service("test", "", nil).Return(services, nil, nil)
infos, err := serviceDiscovery.Services("test")
if err != nil {
t.Errorf("expected error to be nil but was %s", err.Error())
}
if len(infos) != 2 {
t.Errorf("expected 2 services but got %d", len(infos))
}
})
t.Run("should return error if service returns error", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
client := mock_register.NewMockConsulAgent(ctrl)
catalog := mock_register.NewMockConsulCatalog(ctrl)
serviceDiscovery := register.NewConsulServiceDiscovery(client, catalog)
catalog.EXPECT().Service("test", "", nil).Return(nil, nil, errors.New("service error"))
infos, err := serviceDiscovery.Services("test")
if err == nil {
t.Errorf("expected error to be service error but got nil")
}
if infos != nil {
t.Errorf("expected 2 services but got %d", len(infos))
}
})
}

View File

@ -0,0 +1,150 @@
// Copyright 2019 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.
package register
import (
"fmt"
"net"
"reflect"
"strconv"
"strings"
"github.com/hashicorp/consul/api"
"github.com/sorintlab/stolon/internal/cluster"
)
// HealthCheck holds necessary information for performing
// health check on the service
type HealthCheck struct {
TCP string
Interval string
}
// Tags represents various way a service can be tagged
type Tags []string
// NewTags convert comma separated string into Tags
func NewTags(from string) Tags {
return Tags(strings.Split(from, ","))
}
// Compare if two tags are equal
func (t Tags) Compare(tags Tags) bool {
return reflect.DeepEqual(t, tags)
}
// ServiceInfo holds the necessary information about a service
// for service discovery
type ServiceInfo struct {
Name string
Tags Tags
ID string
Address string
IsMaster bool
Port int
Check HealthCheck
}
// ConsulAgentServiceRegistration returns AgentServiceRegistration
func (info *ServiceInfo) ConsulAgentServiceRegistration() *api.AgentServiceRegistration {
check := api.AgentServiceCheck{
TCP: info.Check.TCP,
Interval: info.Check.Interval,
}
service := api.AgentServiceRegistration{
ID: info.ID,
Name: info.Name,
Address: info.Address,
Tags: info.Tags,
Port: info.Port,
Check: &check,
}
return &service
}
// Compare if two ServiceInfo are equal
func (info *ServiceInfo) Compare(target ServiceInfo) bool {
return info.Name == target.Name &&
info.ID == target.ID &&
info.Address == target.Address &&
info.Port == target.Port &&
info.Tags.Compare(target.Tags)
}
// NewServiceInfo return new ServiceInfo from name, db and tags
func NewServiceInfo(name string, db *cluster.DB, tags []string, isMaster bool) (*ServiceInfo, error) {
port, err := strconv.Atoi(db.Status.Port)
if err != nil {
return nil, fmt.Errorf(fmt.Sprintf("invalid database port '%s' for %s with uid %s", db.Status.Port, name, db.UID))
}
return &ServiceInfo{
Name: name,
Tags: tags,
Address: db.Status.ListenAddress,
ID: db.UID,
Port: port,
IsMaster: isMaster,
Check: HealthCheck{
TCP: net.JoinHostPort(db.Status.ListenAddress, db.Status.Port),
Interval: "10s",
},
}, nil
}
// NewServiceInfoFromConsulService returns ServiceInfo from consul service
func NewServiceInfoFromConsulService(service api.CatalogService) ServiceInfo {
return ServiceInfo{
Name: service.ServiceName,
Tags: service.ServiceTags,
ID: service.ServiceID,
Port: service.ServicePort,
Address: service.ServiceAddress,
}
}
// ServiceInfos represents holds collection of ServiceInfo
type ServiceInfos map[string]ServiceInfo
// ServiceInfoStatus holds the services which are newly added / removed
type ServiceInfoStatus struct {
Added ServiceInfos
Removed ServiceInfos
}
// Diff returns ServiceInfoStatus
func (existing ServiceInfos) Diff(discovered ServiceInfos) ServiceInfoStatus {
result := ServiceInfoStatus{Added: ServiceInfos{}, Removed: ServiceInfos{}}
for id, existingInfo := range existing {
if discoveredInfo, ok := discovered[id]; !ok {
result.Added[id] = existingInfo
} else {
if !existingInfo.Compare(discoveredInfo) {
result.Added[id] = existingInfo
result.Removed[id] = discoveredInfo
}
}
}
for id, discoveredInfo := range discovered {
if existingInfo, ok := existing[id]; !ok {
result.Removed[id] = discoveredInfo
} else {
if !discoveredInfo.Compare(existingInfo) {
result.Added[id] = existingInfo
result.Removed[id] = discoveredInfo
}
}
}
return result
}

View File

@ -0,0 +1,438 @@
// Copyright 2019 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.
package register
import (
"reflect"
"testing"
"github.com/hashicorp/consul/api"
"github.com/sorintlab/stolon/internal/cluster"
)
func TestNewServiceInfo(t *testing.T) {
t.Run("should join listen address and port", func(t *testing.T) {
tags := Tags{"tags"}
id := "unique"
actual, _ := NewServiceInfo("test", &cluster.DB{
UID: id,
Status: cluster.DBStatus{
ListenAddress: "127.0.0.1",
Port: "5432"},
}, tags, true)
if actual.Name != "test" {
t.Errorf("expected name to be %s but got %s", "test", actual.Name)
} else if !reflect.DeepEqual(tags, actual.Tags) {
t.Errorf("expected tags to be %v but got %v", tags, actual.Name)
} else if actual.Address != "127.0.0.1" {
t.Errorf("expected address to be %v but got %v", "127.0.0.1", actual.Address)
} else if actual.Port != 5432 {
t.Errorf("expected port to be %v but got %v", "5432", actual.Port)
} else if actual.ID != id {
t.Errorf("expected id to be %v but got %v", id, actual.ID)
} else if actual.IsMaster != true {
t.Errorf("expected isMaster to be %v but got %v", true, actual.ID)
}
})
t.Run("should return error if port is invalid", func(t *testing.T) {
actual, err := NewServiceInfo("test", &cluster.DB{
UID: "unique",
Status: cluster.DBStatus{
ListenAddress: "127.0.0.1",
Port: "cat"},
}, []string{}, true)
if actual != nil {
t.Errorf("expected service info to be nil")
} else if err == nil || err.Error() != "invalid database port 'cat' for test with uid unique" {
t.Errorf("expected invalid database port error but was %s", err.Error())
}
})
}
func TestConsulAgentServiceRegistration(t *testing.T) {
t.Run("should return consul agent service registration", func(t *testing.T) {
tags := Tags{"tags"}
id := "unique"
service, _ := NewServiceInfo("test", &cluster.DB{
UID: id,
Status: cluster.DBStatus{
ListenAddress: "127.0.0.1",
Port: "5432"},
}, tags, true)
actual := service.ConsulAgentServiceRegistration()
if actual == nil {
t.Errorf("expected consul agent service registration not to be nil")
}
if actual.ID != service.ID {
t.Errorf("expected id to be %s but was %s", service.ID, actual.ID)
} else if actual.Name != service.Name {
t.Errorf("expected name to be %s but was %s", service.Name, actual.Name)
} else if actual.Address != service.Address {
t.Errorf("expected Address to be %s but was %s", service.Address, actual.Name)
} else if !service.Tags.Compare(actual.Tags) {
t.Errorf("expected tags to be %v but was %v", service.Tags, actual.Tags)
} else if actual.Port != service.Port {
t.Errorf("expected port to be %d but was %d", service.Port, actual.Port)
} else if actual.Check.TCP != service.Check.TCP {
t.Errorf("expected check tcp to be %s but was %s", service.Check.TCP, actual.Check.TCP)
} else if actual.Check.Interval != service.Check.Interval {
t.Errorf("expected check interval to be %s but was %s", service.Check.Interval, actual.Check.Interval)
}
})
}
func TestNewServiceInfoFromConsulService(t *testing.T) {
t.Run("should return service info", func(t *testing.T) {
service := api.CatalogService{
ServiceName: "service",
ServiceTags: Tags{"one", "two"},
ServiceID: "id",
ServicePort: 1234,
ServiceAddress: "address",
}
info := NewServiceInfoFromConsulService(service)
if info.Name != service.ServiceName {
t.Errorf("expected name to be %s but was %s", service.ServiceName, info.Name)
} else if info.ID != service.ServiceID {
t.Errorf("expected id to be %s but was %s", service.ID, info.ID)
} else if !info.Tags.Compare(service.ServiceTags) {
t.Errorf("expected tags to be %v but was %v", service.ServiceTags, info.Tags)
} else if info.Port != service.ServicePort {
t.Errorf("expected port to be %d but was %d", service.ServicePort, info.Port)
} else if info.Address != service.ServiceAddress {
t.Errorf("expected address to be %s but was %s", service.ServiceAddress, info.Address)
}
})
}
func TestCompareTags(t *testing.T) {
t.Run("should return true if tags are equal", func(t *testing.T) {
tag1 := Tags{"slave", "master"}
tag2 := Tags{"slave", "master"}
if !tag1.Compare(tag2) {
t.Errorf("expected to be true")
}
})
t.Run("should return false if tags are not equal", func(t *testing.T) {
tag1 := Tags{"slave", "master"}
tag2 := Tags{"slave"}
if tag1.Compare(tag2) {
t.Errorf("expected to be false")
}
})
}
func TestCompareServiceInfo(t *testing.T) {
t.Run("should return true if ServiceInfo(s) are equal", func(t *testing.T) {
serviceInfo := ServiceInfo{
Name: "test",
Address: "127.0.0.1",
Port: 5432,
ID: "master",
Tags: Tags{"master"},
}
anotherServiceInfo := ServiceInfo{
Name: "test",
Address: "127.0.0.1",
Port: 5432,
ID: "master",
Tags: Tags{"master"},
}
if !serviceInfo.Compare(anotherServiceInfo) {
t.Errorf("expected to be true")
}
})
t.Run("should return false if service names are not equal", func(t *testing.T) {
serviceInfo := ServiceInfo{
Name: "test",
Address: "127.0.0.1",
Port: 5432,
ID: "master",
Tags: Tags{"master"},
}
anotherServiceInfo := ServiceInfo{
Name: "test1",
Address: "127.0.0.1",
Port: 5432,
ID: "master",
Tags: Tags{"master"},
}
if serviceInfo.Compare(anotherServiceInfo) {
t.Errorf("expected to be false")
}
})
t.Run("should return false if service address are not equal", func(t *testing.T) {
serviceInfo := ServiceInfo{
Name: "test",
Address: "127.0.0.2",
Port: 5432,
ID: "master",
Tags: Tags{"master"},
}
anotherServiceInfo := ServiceInfo{
Name: "test",
Address: "127.0.0.1",
Port: 5432,
ID: "master",
Tags: Tags{"master"},
}
if serviceInfo.Compare(anotherServiceInfo) {
t.Errorf("expected to be false")
}
})
t.Run("should return false if service port are not equal", func(t *testing.T) {
serviceInfo := ServiceInfo{
Name: "test",
Address: "127.0.0.1",
Port: 5433,
ID: "master",
Tags: Tags{"master"},
}
anotherServiceInfo := ServiceInfo{
Name: "test",
Address: "127.0.0.1",
Port: 5432,
ID: "master",
Tags: Tags{"master"},
}
if serviceInfo.Compare(anotherServiceInfo) {
t.Errorf("expected to be false")
}
})
t.Run("should return false if Ids are not equal", func(t *testing.T) {
serviceInfo := ServiceInfo{
Name: "test",
Address: "127.0.0.1",
Port: 5432,
ID: "slave",
Tags: Tags{"master"},
}
anotherServiceInfo := ServiceInfo{
Name: "test",
Address: "127.0.0.1",
Port: 5432,
ID: "master",
Tags: Tags{"master"},
}
if serviceInfo.Compare(anotherServiceInfo) {
t.Errorf("expected to be false")
}
})
t.Run("should return false if tags are not equal", func(t *testing.T) {
serviceInfo := ServiceInfo{
Name: "test",
Address: "127.0.0.1",
Port: 5432,
ID: "master",
Tags: Tags{"master", "sdf"},
}
anotherServiceInfo := ServiceInfo{
Name: "test",
Address: "127.0.0.1",
Port: 5432,
ID: "master",
Tags: Tags{"master"},
}
if serviceInfo.Compare(anotherServiceInfo) {
t.Errorf("expected to be false")
}
})
}
func TestServiceInfosDiff(t *testing.T) {
t.Run("should not add or remove when discovered and existing services are empty", func(t *testing.T) {
discoveredServiceInfos := ServiceInfos{}
existingServiceInfos := ServiceInfos{}
diff := existingServiceInfos.Diff(discoveredServiceInfos)
if len(diff.Added) != 0 {
t.Errorf("expected no service to be added but %d service got added", len(diff.Added))
}
if len(diff.Removed) != 0 {
t.Errorf("expected no service to be removed but %d service got removed", len(diff.Removed))
}
})
t.Run("should only add when new services are found", func(t *testing.T) {
discoveredServiceInfos := ServiceInfos{}
existingServiceInfos := ServiceInfos{"masterUID": ServiceInfo{}, "slaveUID": ServiceInfo{}}
diff := existingServiceInfos.Diff(discoveredServiceInfos)
if len(diff.Added) != 2 {
t.Errorf("expected 2 service to be added but %d service got added", len(diff.Added))
}
if len(diff.Removed) != 0 {
t.Errorf("expected no service to be removed but %d service got added", len(diff.Removed))
}
for _, id := range []string{"masterUID", "slaveUID"} {
_, ok := diff.Added[id]
if !ok {
t.Errorf("expected %s to be added but not", id)
}
}
})
t.Run("should only remove when discovered services no longer exists", func(t *testing.T) {
discoveredServiceInfos := ServiceInfos{"masterUID": ServiceInfo{ID: "masterUID"}, "slaveUID": ServiceInfo{ID: "slaveUID"}}
existingServiceInfos := ServiceInfos{}
diff := existingServiceInfos.Diff(discoveredServiceInfos)
if len(diff.Added) != 0 {
t.Errorf("expected no service to be added but %d service got added", len(diff.Added))
}
if len(diff.Removed) != 2 {
t.Errorf("expected 2 service to be removed but %d service got added", len(diff.Removed))
}
for _, id := range []string{"masterUID", "slaveUID"} {
_, ok := diff.Removed[id]
if !ok {
t.Errorf("expected %s to be removed but not", id)
}
}
})
t.Run("should not add or remove when discovered and existing services are same", func(t *testing.T) {
discoveredServiceInfos := ServiceInfos{"masterUID": ServiceInfo{}, "slaveUID": ServiceInfo{}}
existingServiceInfos := ServiceInfos{"masterUID": ServiceInfo{}, "slaveUID": ServiceInfo{}}
diff := existingServiceInfos.Diff(discoveredServiceInfos)
if len(diff.Added) != 0 {
t.Errorf("expected no service to be added but %d service got added", len(diff.Added))
}
if len(diff.Removed) != 0 {
t.Errorf("expected no service to be removed but %d service got removed", len(diff.Removed))
}
})
t.Run("should add and remove corresponding service infos", func(t *testing.T) {
discoveredServiceInfos := ServiceInfos{"masterUID": ServiceInfo{ID: "masterUID"}, "slaveUID": ServiceInfo{ID: "slaveUID"}}
existingServiceInfos := ServiceInfos{"newSlaveUID": ServiceInfo{ID: "newSlaveUID"}, "slaveUID": ServiceInfo{ID: "slaveUID"}}
diff := existingServiceInfos.Diff(discoveredServiceInfos)
if len(diff.Added) != 1 {
t.Errorf("expected 1 service to be added but %d service got added", len(diff.Added))
}
if len(diff.Removed) != 1 {
t.Errorf("expected 1 service to be removed but %d service got added", len(diff.Removed))
}
for _, id := range []string{"masterUID"} {
_, ok := diff.Removed[id]
if !ok {
t.Errorf("expected %s to be removed but not", id)
}
}
for _, id := range []string{"newSlaveUID"} {
_, ok := diff.Added[id]
if !ok {
t.Errorf("expected %s to be added but not", id)
}
}
})
t.Run("should add and remove corresponding service infos", func(t *testing.T) {
discoveredServiceInfos := ServiceInfos{"masterUID": ServiceInfo{ID: "masterUID", Tags: Tags{"master"}}, "slaveUID": ServiceInfo{ID: "slaveUID"},
"anotherSlaveUID": ServiceInfo{ID: "anotherSlaveUID"}}
existingServiceInfos := ServiceInfos{"masterUID": ServiceInfo{ID: "masterUID", Tags: Tags{"slave"}}, "slaveUID": ServiceInfo{ID: "slaveUID"},
"anotherSlaveUID": ServiceInfo{ID: "anotherSlaveUID", Tags: Tags{"master"}}}
diff := existingServiceInfos.Diff(discoveredServiceInfos)
if len(diff.Added) != 2 {
t.Errorf("expected 2 service to be added but %d service got added", len(diff.Added))
}
if len(diff.Removed) != 2 {
t.Errorf("expected 2 service to be removed but %d service got added", len(diff.Removed))
}
for _, id := range []string{"masterUID", "anotherSlaveUID"} {
_, ok := diff.Removed[id]
if !ok {
t.Errorf("expected %s to be removed but not", id)
}
}
for _, id := range []string{"masterUID", "anotherSlaveUID"} {
_, ok := diff.Added[id]
if !ok {
t.Errorf("expected %s to be added but not", id)
}
}
})
}
func TestNewTags(t *testing.T) {
t.Run("should split comma separated string into tags", func(t *testing.T) {
from := "one,two"
actual := NewTags(from)
if len(actual) != 2 {
t.Errorf("expected 2 tags but got %d", len(actual))
}
if actual[0] != "one" {
t.Errorf("expected to equal to one but was %s", actual[0])
}
if actual[1] != "two" {
t.Errorf("expected to equal to one but was %s", actual[1])
}
})
t.Run("should not error if strings are not comma separated", func(t *testing.T) {
from := "one;two"
actual := NewTags(from)
if len(actual) != 1 {
t.Errorf("expected 1 tags but got %d", len(actual))
}
})
t.Run("should not error if string is empty", func(t *testing.T) {
from := ""
actual := NewTags(from)
if len(actual) != 1 {
t.Errorf("expected 1 tags but got %d", len(actual))
}
})
}

View File

@ -0,0 +1,333 @@
// Copyright 2019 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
package cmd
import (
"github.com/golang/mock/gomock"
"github.com/sorintlab/stolon/cmd/stolonctl/cmd/internal/mock/register"
"github.com/sorintlab/stolon/internal/cluster"
"github.com/sorintlab/stolon/internal/mock/store"
"github.com/sorintlab/stolon/internal/store"
"testing"
"github.com/sorintlab/stolon/cmd"
"github.com/sorintlab/stolon/cmd/stolonctl/cmd/register"
)
func TestCheckConfig(t *testing.T) {
t.Run("should check for cluster name", func(t *testing.T) {
c := config{}
rc := register.Config{}
err := checkConfig(&c, &rc)
if err == nil || err.Error() != "cluster name required" {
t.Errorf("expected cluster name required error")
}
})
t.Run("should check for store backend", func(t *testing.T) {
c := config{CommonConfig: cmd.CommonConfig{ClusterName: "test"}}
rc := register.Config{}
err := checkConfig(&c, &rc)
if err == nil || err.Error() != "store backend type required" {
t.Errorf("expected store backend type required error")
}
})
t.Run("should check for consul register backend", func(t *testing.T) {
c := config{CommonConfig: cmd.CommonConfig{ClusterName: "test", StoreBackend: "consul"}}
rc := register.Config{Backend: "something other than consul"}
err := checkConfig(&c, &rc)
if err == nil || err.Error() != "unknown register backend: \"something other than consul\"" {
t.Errorf("expected unknown register backend but got %s", err.Error())
}
})
t.Run("should not return any error if all valid configurations are specified", func(t *testing.T) {
c := config{CommonConfig: cmd.CommonConfig{ClusterName: "test", StoreBackend: "consul"}}
rc := register.Config{Backend: "consul"}
err := checkConfig(&c, &rc)
if err != nil {
t.Errorf("expected no error but got '%v'", err.Error())
}
})
}
func TestCheckAndRegisterMasterAndSlaves(t *testing.T) {
t.Run("should deregister all the discovered services", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockStore := mock_store.NewMockStore(ctrl)
mockServiceDiscovery := mock_register.NewMockServiceDiscovery(ctrl)
clusterName := "test-cluster"
serviceInfo := register.ServiceInfo{Name: clusterName, ID: "uid1", Tags: []string{"slave"}}
anotherServiceInfo := register.ServiceInfo{Name: clusterName, ID: "uid2", Tags: []string{"slave"}}
discoveredServices := register.ServiceInfos{
"uid1": serviceInfo,
"uid2": anotherServiceInfo,
}
mockServiceDiscovery.EXPECT().Services(clusterName).Return(discoveredServices, nil)
clusterData := cluster.ClusterData{
Cluster: &cluster.Cluster{},
DBs: cluster.DBs{},
}
mockStore.EXPECT().GetClusterData(gomock.Any()).Return(&clusterData, &store.KVPair{}, nil)
mockServiceDiscovery.EXPECT().DeRegister(&anotherServiceInfo)
mockServiceDiscovery.EXPECT().DeRegister(&serviceInfo)
checkAndRegisterMasterAndSlaves(clusterName, mockStore, mockServiceDiscovery, false)
})
t.Run("should register all the existing services", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockStore := mock_store.NewMockStore(ctrl)
mockServiceDiscovery := mock_register.NewMockServiceDiscovery(ctrl)
clusterName := "test-cluster"
serviceInfo := register.ServiceInfo{Name: clusterName, ID: "uid1", Port: 5432, Tags: []string{"slave"}, Check: register.HealthCheck{TCP: ":5432", Interval: "10s"}}
anotherServiceInfo := register.ServiceInfo{Name: clusterName, Port: 5433, ID: "uid2", Tags: []string{"slave"}, Check: register.HealthCheck{TCP: ":5433", Interval: "10s"}}
discoveredServices := register.ServiceInfos{}
mockServiceDiscovery.EXPECT().Services(clusterName).Return(discoveredServices, nil)
clusterData := cluster.ClusterData{
Cluster: &cluster.Cluster{},
DBs: cluster.DBs{
"uid1": &cluster.DB{
UID: "uid1",
Status: cluster.DBStatus{Port: "5432", Healthy: true},
},
"uid2": &cluster.DB{
UID: "uid2",
Status: cluster.DBStatus{Port: "5433", Healthy: true},
},
},
}
mockStore.EXPECT().GetClusterData(gomock.Any()).Return(&clusterData, &store.KVPair{}, nil)
mockServiceDiscovery.EXPECT().Register(&serviceInfo)
mockServiceDiscovery.EXPECT().Register(&anotherServiceInfo)
checkAndRegisterMasterAndSlaves(clusterName, mockStore, mockServiceDiscovery, false)
})
t.Run("should register existing services and deregister the discovered service which are no longer available", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockStore := mock_store.NewMockStore(ctrl)
mockServiceDiscovery := mock_register.NewMockServiceDiscovery(ctrl)
clusterName := "test-cluster"
serviceInfo := register.ServiceInfo{Name: clusterName, ID: "uid1", Port: 5432, Tags: []string{"slave"}, Check: register.HealthCheck{TCP: ":5432", Interval: "10s"}}
anotherServiceInfo := register.ServiceInfo{Name: clusterName, Port: 5433, ID: "uid2", Tags: []string{"slave"}, Check: register.HealthCheck{TCP: ":5433", Interval: "10s"}}
yetAnotherServiceInfo := register.ServiceInfo{Name: clusterName, Port: 5434, ID: "uid3", Tags: []string{"slave"}, Check: register.HealthCheck{TCP: ":5433", Interval: "10s"}}
discoveredServices := register.ServiceInfos{"uid1": serviceInfo, "uid2": anotherServiceInfo, "uid3": yetAnotherServiceInfo}
mockServiceDiscovery.EXPECT().Services(clusterName).Return(discoveredServices, nil)
clusterData := cluster.ClusterData{
Cluster: &cluster.Cluster{},
DBs: cluster.DBs{
"uid1": &cluster.DB{
UID: "uid1",
Status: cluster.DBStatus{Port: "5432", Healthy: true},
},
"uid2": &cluster.DB{
UID: "uid2",
Status: cluster.DBStatus{Port: "5433", Healthy: true},
},
},
}
mockStore.EXPECT().GetClusterData(gomock.Any()).Return(&clusterData, &store.KVPair{}, nil)
mockServiceDiscovery.EXPECT().DeRegister(&yetAnotherServiceInfo)
checkAndRegisterMasterAndSlaves(clusterName, mockStore, mockServiceDiscovery, false)
})
t.Run("master registration is not allowed", func(t *testing.T) {
t.Run("should deregister the master even it exists", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockStore := mock_store.NewMockStore(ctrl)
mockServiceDiscovery := mock_register.NewMockServiceDiscovery(ctrl)
clusterName := "test-cluster"
serviceInfo := register.ServiceInfo{Name: clusterName, ID: "uid1", Port: 5432, Tags: []string{"slave"}, Check: register.HealthCheck{TCP: ":5432", Interval: "10s"}}
masterServiceInfo := register.ServiceInfo{Name: clusterName, Port: 5433, ID: "uid2", Tags: []string{"slave"}, Check: register.HealthCheck{TCP: ":5433", Interval: "10s"}, IsMaster: true}
discoveredServices := register.ServiceInfos{"uid1": serviceInfo, "uid2": masterServiceInfo}
mockServiceDiscovery.EXPECT().Services(clusterName).Return(discoveredServices, nil)
clusterData := cluster.ClusterData{
Cluster: &cluster.Cluster{Status: cluster.ClusterStatus{Master: "uid2"}},
DBs: cluster.DBs{
"uid1": &cluster.DB{
UID: "uid1",
Status: cluster.DBStatus{Port: "5432", Healthy: true},
},
"uid2": &cluster.DB{
UID: "uid2",
Status: cluster.DBStatus{Port: "5433", Healthy: true},
},
},
}
mockStore.EXPECT().GetClusterData(gomock.Any()).Return(&clusterData, &store.KVPair{}, nil)
mockServiceDiscovery.EXPECT().DeRegister(&masterServiceInfo)
checkAndRegisterMasterAndSlaves(clusterName, mockStore, mockServiceDiscovery, false)
})
t.Run("should deregister if discovered service is master", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockStore := mock_store.NewMockStore(ctrl)
mockServiceDiscovery := mock_register.NewMockServiceDiscovery(ctrl)
clusterName := "test-cluster"
masterService := register.ServiceInfo{IsMaster: true, Name: clusterName, Port: 5434, ID: "uid2", Tags: []string{"master"}, Check: register.HealthCheck{TCP: ":5433", Interval: "10s"}}
discoveredServices := register.ServiceInfos{"uid3": masterService}
mockServiceDiscovery.EXPECT().Services(clusterName).Return(discoveredServices, nil)
clusterData := cluster.ClusterData{
Cluster: &cluster.Cluster{Status: cluster.ClusterStatus{Master: "uid2"}},
DBs: cluster.DBs{
"uid2": &cluster.DB{
UID: "uid2",
Status: cluster.DBStatus{Port: "5433", Healthy: true},
},
},
}
mockStore.EXPECT().GetClusterData(gomock.Any()).Return(&clusterData, &store.KVPair{}, nil)
mockServiceDiscovery.EXPECT().DeRegister(&masterService)
checkAndRegisterMasterAndSlaves(clusterName, mockStore, mockServiceDiscovery, false)
})
t.Run("should not register if existing service is master", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockStore := mock_store.NewMockStore(ctrl)
mockServiceDiscovery := mock_register.NewMockServiceDiscovery(ctrl)
clusterName := "test-cluster"
discoveredServices := register.ServiceInfos{}
mockServiceDiscovery.EXPECT().Services(clusterName).Return(discoveredServices, nil)
clusterData := cluster.ClusterData{
Cluster: &cluster.Cluster{Status: cluster.ClusterStatus{Master: "uid1"}},
DBs: cluster.DBs{
"uid1": &cluster.DB{
UID: "uid1",
Status: cluster.DBStatus{Port: "5432", Healthy: true},
},
},
}
mockStore.EXPECT().GetClusterData(gomock.Any()).Return(&clusterData, &store.KVPair{}, nil)
checkAndRegisterMasterAndSlaves(clusterName, mockStore, mockServiceDiscovery, false)
})
})
t.Run("master registration is allowed", func(t *testing.T) {
t.Run("should register the master if its discovered", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockStore := mock_store.NewMockStore(ctrl)
mockServiceDiscovery := mock_register.NewMockServiceDiscovery(ctrl)
clusterName := "test-cluster"
serviceInfo := register.ServiceInfo{Name: clusterName, ID: "uid1", Port: 5432, Tags: []string{"slave"}, Check: register.HealthCheck{TCP: ":5432", Interval: "10s"}}
masterServiceInfo := register.ServiceInfo{Name: clusterName, Port: 5433, ID: "uid2", Tags: []string{"master"}, Check: register.HealthCheck{TCP: ":5433", Interval: "10s"}, IsMaster: true}
discoveredServices := register.ServiceInfos{"uid1": serviceInfo}
mockServiceDiscovery.EXPECT().Services(clusterName).Return(discoveredServices, nil)
clusterData := cluster.ClusterData{
Cluster: &cluster.Cluster{Status: cluster.ClusterStatus{Master: "uid2"}},
DBs: cluster.DBs{
"uid1": &cluster.DB{
UID: "uid1",
Status: cluster.DBStatus{Port: "5432", Healthy: true},
},
"uid2": &cluster.DB{
UID: "uid2",
Status: cluster.DBStatus{Port: "5433", Healthy: true},
},
},
}
mockStore.EXPECT().GetClusterData(gomock.Any()).Return(&clusterData, &store.KVPair{}, nil)
mockServiceDiscovery.EXPECT().Register(&masterServiceInfo)
checkAndRegisterMasterAndSlaves(clusterName, mockStore, mockServiceDiscovery, true)
})
t.Run("should deregister if non existing is master", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockStore := mock_store.NewMockStore(ctrl)
mockServiceDiscovery := mock_register.NewMockServiceDiscovery(ctrl)
clusterName := "test-cluster"
masterService := register.ServiceInfo{IsMaster: true, Name: clusterName, Port: 5434, ID: "uid3", Tags: []string{"slave"}, Check: register.HealthCheck{TCP: ":5433", Interval: "10s"}}
discoveredServices := register.ServiceInfos{"uid3": masterService}
mockServiceDiscovery.EXPECT().Services(clusterName).Return(discoveredServices, nil)
clusterData := cluster.ClusterData{
Cluster: &cluster.Cluster{},
DBs: cluster.DBs{},
}
mockStore.EXPECT().GetClusterData(gomock.Any()).Return(&clusterData, &store.KVPair{}, nil)
mockServiceDiscovery.EXPECT().DeRegister(&masterService)
checkAndRegisterMasterAndSlaves(clusterName, mockStore, mockServiceDiscovery, true)
})
t.Run("should register if existing service is master", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockStore := mock_store.NewMockStore(ctrl)
mockServiceDiscovery := mock_register.NewMockServiceDiscovery(ctrl)
clusterName := "test-cluster"
discoveredServices := register.ServiceInfos{}
masterService := register.ServiceInfo{IsMaster: true, Name: clusterName, Port: 5432, ID: "uid1", Tags: []string{"master"}, Check: register.HealthCheck{TCP: ":5432", Interval: "10s"}}
mockServiceDiscovery.EXPECT().Services(clusterName).Return(discoveredServices, nil)
clusterData := cluster.ClusterData{
Cluster: &cluster.Cluster{Status: cluster.ClusterStatus{Master: "uid1"}},
DBs: cluster.DBs{
"uid1": &cluster.DB{
UID: "uid1",
Status: cluster.DBStatus{Port: "5432", Healthy: true},
},
},
}
mockStore.EXPECT().GetClusterData(gomock.Any()).Return(&clusterData, &store.KVPair{}, nil)
mockServiceDiscovery.EXPECT().Register(&masterService)
checkAndRegisterMasterAndSlaves(clusterName, mockStore, mockServiceDiscovery, true)
})
})
}

View File

@ -20,6 +20,7 @@ We suggest that you first read the [Stolon Architecture and Requirements](archit
* [Enabling synchronous replication](syncrepl.md)
* [PostgreSQL SSL/TLS setup](ssl.md)
* [Forcing a failover](forcefailover.md)
* [Service Discovery](service_discovery.md)
### Recipes

View File

@ -36,10 +36,11 @@ stolonctl [flags]
* [stolonctl failkeeper](stolonctl_failkeeper.md) - Force keeper as "temporarily" failed. The sentinel will compute a new clusterdata considering it as failed and then restore its state to the real one.
* [stolonctl init](stolonctl_init.md) - Initialize a new cluster
* [stolonctl promote](stolonctl_promote.md) - Promotes a standby cluster to a primary cluster
* [stolonctl register](stolonctl_register.md) - Register stolon keepers for service discovery
* [stolonctl removekeeper](stolonctl_removekeeper.md) - Removes keeper from cluster data
* [stolonctl spec](stolonctl_spec.md) - Retrieve the current cluster specification
* [stolonctl status](stolonctl_status.md) - Display the current cluster status
* [stolonctl update](stolonctl_update.md) - Update a cluster specification
* [stolonctl version](stolonctl_version.md) - Display the version
###### Auto generated by spf13/cobra on 24-Nov-2018
###### Auto generated by spf13/cobra on 1-Mar-2019

View File

@ -0,0 +1,53 @@
## stolonctl register
Register stolon keepers for service discovery
### Synopsis
Register stolon keepers for service discovery
```
stolonctl register [flags]
```
### Options
```
--debug enable debug logging
-h, --help help for register
--register-backend string register backend type (consul) (default "consul")
--register-ca-file string verify certificates of HTTPS-enabled register servers using this CA bundle
--register-cert-file string certificate file for client identification to the register
--register-endpoints string a comma-delimited list of register endpoints (use https scheme for tls communication) defaults: http://127.0.0.1:8500 for consul (default "http://127.0.0.1:8500")
--register-key string private key file for client identification to the register
--register-master register master as well for service discovery (use it with caution!!!)
--register-skip-tls-verify skip register certificate verification (insecure!!!)
--sleep-interval int number of seconds to sleep before probing for change (default 10)
--tag-master-as string a comma-delimited list of tag to be used when registering master (default "master")
--tag-slave-as string a comma-delimited list of tag to be used when registering slave (default "slave")
```
### Options inherited from parent commands
```
--cluster-name string cluster name
--kube-context string name of the kubeconfig context to use
--kube-namespace string name of the kubernetes namespace to use
--kube-resource-kind string the k8s resource kind to be used to store stolon clusterdata and do sentinel leader election (only "configmap" is currently supported)
--kubeconfig string path to kubeconfig file. Overrides $KUBECONFIG
--log-level string debug, info (default), warn or error (default "info")
--metrics-listen-address string metrics listen address i.e "0.0.0.0:8080" (disabled by default)
--store-backend string store backend type (etcdv2/etcd, etcdv3, consul or kubernetes)
--store-ca-file string verify certificates of HTTPS-enabled store servers using this CA bundle
--store-cert-file string certificate file for client identification to the store
--store-endpoints string a comma-delimited list of store endpoints (use https scheme for tls communication) (defaults: http://127.0.0.1:2379 for etcd, http://127.0.0.1:8500 for consul)
--store-key string private key file for client identification to the store
--store-prefix string the store base prefix (default "stolon/cluster")
--store-skip-tls-verify skip store certificate verification (insecure!!!)
```
### SEE ALSO
* [stolonctl](stolonctl.md) - stolon command line client
###### Auto generated by spf13/cobra on 1-Mar-2019

11
doc/service_discovery.md Normal file
View File

@ -0,0 +1,11 @@
## Service Discovery
Service discovery helps in discovering stolon standby keepers for performing readonly queries
![Stolon architecture](stolon_service_discovery.png)
`register` watches the keeper and registers them in `consul` for service discovery.
`clients` which want to do readonly query can find the standby keepers by querying `consul`
> use `--register-master` flag to register master for performing readonly queries, don't use it for write queries, always use `proxy` to connect to master, refer [here](./faq.md#why-clients-should-use-the-stolon-proxy) for more info

Binary file not shown.

After

Width:  |  Height:  |  Size: 152 KiB

2
go.mod
View File

@ -34,7 +34,7 @@ require (
github.com/gregjones/httpcache v0.0.0-20171119193500-2bcd89a1743f // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.5.1 // indirect
github.com/hashicorp/consul v1.0.2 // indirect
github.com/hashicorp/consul v1.0.2
github.com/hashicorp/go-cleanhttp v0.0.0-20160217214820-875fb671b3dd // indirect
github.com/hashicorp/go-immutable-radix v1.0.0 // indirect
github.com/hashicorp/go-msgpack v0.0.0-20150518234257-fa3f63826f7c // indirect