diff --git a/cmd/common.go b/cmd/common.go index d1c1c63..abc71f4 100644 --- a/cmd/common.go +++ b/cmd/common.go @@ -34,7 +34,7 @@ type CommonConfig struct { } func AddCommonFlags(cmd *cobra.Command, cfg *CommonConfig) { - cmd.PersistentFlags().StringVar(&cfg.StoreBackend, "store-backend", "", "store backend type (etcd or consul)") + cmd.PersistentFlags().StringVar(&cfg.StoreBackend, "store-backend", "", "store backend type (etcdv2/etcd, etcdv3 or consul)") cmd.PersistentFlags().StringVar(&cfg.StoreEndpoints, "store-endpoints", "", "a comma-delimited list of store endpoints (use https scheme for tls communication) (defaults: http://127.0.0.1:2379 for etcd, http://127.0.0.1:8500 for consul)") cmd.PersistentFlags().StringVar(&cfg.StoreCertFile, "store-cert-file", "", "certificate file for client identification to the store") cmd.PersistentFlags().StringVar(&cfg.StoreKeyFile, "store-key", "", "private key file for client identification to the store") @@ -54,6 +54,10 @@ func CheckCommonConfig(cfg *CommonConfig) error { switch cfg.StoreBackend { case "consul": case "etcd": + // etcd is old alias for etcdv2 + cfg.StoreBackend = "etcdv2" + case "etcdv2": + case "etcdv3": default: return fmt.Errorf("Unknown store backend: %q", cfg.StoreBackend) } diff --git a/cmd/keeper/keeper.go b/cmd/keeper/keeper.go index 363cfe6..2c106dd 100644 --- a/cmd/keeper/keeper.go +++ b/cmd/keeper/keeper.go @@ -364,7 +364,7 @@ type PostgresKeeper struct { sleepInterval time.Duration requestTimeout time.Duration - e *store.StoreManager + e *store.Store pgm *postgresql.Manager stop chan bool end chan error @@ -381,7 +381,7 @@ type PostgresKeeper struct { func NewPostgresKeeper(cfg *config, stop chan bool, end chan error) (*PostgresKeeper, error) { storePath := filepath.Join(common.StoreBasePath, cfg.ClusterName) - kvstore, err := store.NewStore(store.Config{ + kvstore, err := store.NewKVStore(store.Config{ Backend: store.Backend(cfg.StoreBackend), Endpoints: cfg.StoreEndpoints, CertFile: cfg.StoreCertFile, @@ -392,7 +392,7 @@ func NewPostgresKeeper(cfg *config, stop chan bool, end chan error) (*PostgresKe if err != nil { return nil, fmt.Errorf("cannot create store: %v", err) } - e := store.NewStoreManager(kvstore, storePath) + e := store.NewStore(kvstore, storePath) // Clean and get absolute datadir path dataDir, err := filepath.Abs(cfg.dataDir) @@ -480,7 +480,7 @@ func (p *PostgresKeeper) updateKeeperInfo() error { // The time to live is just to automatically remove old entries, it's // not used to determine if the keeper info has been updated. - if err := p.e.SetKeeperInfo(keeperUID, keeperInfo, p.sleepInterval); err != nil { + if err := p.e.SetKeeperInfo(context.TODO(), keeperUID, keeperInfo, p.sleepInterval); err != nil { return err } return nil @@ -651,7 +651,7 @@ func (p *PostgresKeeper) Start() { var err error var cd *cluster.ClusterData - cd, _, err = p.e.GetClusterData() + cd, _, err = p.e.GetClusterData(context.TODO()) if err != nil { log.Errorw("error retrieving cluster data", zap.Error(err)) } else if cd != nil { @@ -841,7 +841,7 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) { e := p.e pgm := p.pgm - cd, _, err := e.GetClusterData() + cd, _, err := e.GetClusterData(pctx) if err != nil { log.Errorw("error retrieving cluster data", zap.Error(err)) return diff --git a/cmd/proxy/proxy.go b/cmd/proxy/proxy.go index 06ef94e..f75cf7b 100644 --- a/cmd/proxy/proxy.go +++ b/cmd/proxy/proxy.go @@ -15,6 +15,7 @@ package main import ( + "context" "fmt" "net" "os" @@ -100,7 +101,7 @@ type ClusterChecker struct { listener *net.TCPListener pp *pollon.Proxy - e *store.StoreManager + e *store.Store endPollonProxyCh chan error pollonMutex sync.Mutex @@ -109,7 +110,7 @@ type ClusterChecker struct { func NewClusterChecker(uid string, cfg config) (*ClusterChecker, error) { storePath := filepath.Join(common.StoreBasePath, cfg.ClusterName) - kvstore, err := store.NewStore(store.Config{ + kvstore, err := store.NewKVStore(store.Config{ Backend: store.Backend(cfg.StoreBackend), Endpoints: cfg.StoreEndpoints, CertFile: cfg.StoreCertFile, @@ -120,7 +121,7 @@ func NewClusterChecker(uid string, cfg config) (*ClusterChecker, error) { if err != nil { return nil, fmt.Errorf("cannot create store: %v", err) } - e := store.NewStoreManager(kvstore, storePath) + e := store.NewStore(kvstore, storePath) return &ClusterChecker{ uid: uid, @@ -189,14 +190,14 @@ func (c *ClusterChecker) sendPollonConfData(confData pollon.ConfData) { } } -func (c *ClusterChecker) SetProxyInfo(e *store.StoreManager, generation int64, ttl time.Duration) error { +func (c *ClusterChecker) SetProxyInfo(e *store.Store, generation int64, ttl time.Duration) error { proxyInfo := &cluster.ProxyInfo{ UID: c.uid, Generation: generation, } log.Debugf("proxyInfo dump: %s", spew.Sdump(proxyInfo)) - if err := c.e.SetProxyInfo(proxyInfo, ttl); err != nil { + if err := c.e.SetProxyInfo(context.TODO(), proxyInfo, ttl); err != nil { return err } return nil @@ -204,7 +205,7 @@ func (c *ClusterChecker) SetProxyInfo(e *store.StoreManager, generation int64, t // Check reads the cluster data and applies the right pollon configuration. func (c *ClusterChecker) Check() error { - cd, _, err := c.e.GetClusterData() + cd, _, err := c.e.GetClusterData(context.TODO()) if err != nil { return fmt.Errorf("cannot get cluster data: %v", err) } @@ -311,6 +312,10 @@ func (c *ClusterChecker) Start() error { checkCh := make(chan error) timerCh := time.NewTimer(0).C + // TODO(sgotti) TimeoutCecker is needed to forcefully close connection also + // if the Check method is blocked somewhere. + // The idomatic/cleaner solution will be to use a context instead of this + // TimeoutChecker but we have to change the libkv stores to support contexts. go c.TimeoutChecker(checkOkCh) for true { diff --git a/cmd/sentinel/sentinel.go b/cmd/sentinel/sentinel.go index a2b3b83..88e0d38 100644 --- a/cmd/sentinel/sentinel.go +++ b/cmd/sentinel/sentinel.go @@ -40,7 +40,6 @@ import ( "github.com/sorintlab/stolon/pkg/util" "github.com/davecgh/go-spew/spew" - "github.com/docker/leadership" "github.com/mitchellh/copystructure" "github.com/spf13/cobra" "go.uber.org/zap" @@ -96,7 +95,7 @@ func die(format string, a ...interface{}) { func (s *Sentinel) electionLoop() { for { log.Infow("Trying to acquire sentinels leadership") - electedCh, errCh := s.candidate.RunForElection() + electedCh, errCh := s.election.RunForElection() for { select { case elected := <-electedCh: @@ -120,6 +119,7 @@ func (s *Sentinel) electionLoop() { goto end case <-s.stop: log.Debugw("stopping election loop") + s.election.Stop() return } } @@ -136,22 +136,18 @@ func (s *Sentinel) syncRepl(spec *cluster.ClusterSpec) bool { return *spec.SynchronousReplication && *spec.Role == cluster.ClusterRoleMaster } -func (s *Sentinel) setSentinelInfo(ttl time.Duration) error { +func (s *Sentinel) setSentinelInfo(ctx context.Context, ttl time.Duration) error { sentinelInfo := &cluster.SentinelInfo{ UID: s.uid, } log.Debugw("sentinelInfo dump", "sentinelInfo", sentinelInfo) - if err := s.e.SetSentinelInfo(sentinelInfo, ttl); err != nil { + if err := s.e.SetSentinelInfo(ctx, sentinelInfo, ttl); err != nil { return err } return nil } -func (s *Sentinel) getKeepersInfo(ctx context.Context) (cluster.KeepersInfo, error) { - return s.e.GetKeepersInfo() -} - func (s *Sentinel) SetKeeperError(uid string) { if _, ok := s.keeperErrorTimers[uid]; !ok { s.keeperErrorTimers[uid] = timer.Now() @@ -1443,11 +1439,11 @@ type DBConvergenceInfo struct { type Sentinel struct { uid string cfg *config - e *store.StoreManager + e *store.Store - candidate *leadership.Candidate - stop chan bool - end chan bool + election store.Election + stop chan bool + end chan bool lastLeadershipCount uint @@ -1493,7 +1489,7 @@ func NewSentinel(uid string, cfg *config, stop chan bool, end chan bool) (*Senti storePath := filepath.Join(common.StoreBasePath, cfg.ClusterName) - kvstore, err := store.NewStore(store.Config{ + kvstore, err := store.NewKVStore(store.Config{ Backend: store.Backend(cfg.StoreBackend), Endpoints: cfg.StoreEndpoints, CertFile: cfg.StoreCertFile, @@ -1504,15 +1500,15 @@ func NewSentinel(uid string, cfg *config, stop chan bool, end chan bool) (*Senti if err != nil { return nil, fmt.Errorf("cannot create store: %v", err) } - e := store.NewStoreManager(kvstore, storePath) + e := store.NewStore(kvstore, storePath) - candidate := leadership.NewCandidate(kvstore, filepath.Join(storePath, common.SentinelLeaderKey), uid, store.MinTTL) + election := store.NewElection(kvstore, filepath.Join(storePath, common.SentinelLeaderKey), uid) return &Sentinel{ uid: uid, cfg: cfg, e: e, - candidate: candidate, + election: election, leader: false, initialClusterSpec: initialClusterSpec, stop: stop, @@ -1539,9 +1535,8 @@ func (s *Sentinel) Start() { for true { select { case <-s.stop: - log.Debugw("stopping stolon sentinel") + log.Infow("stopping stolon sentinel") cancel() - s.candidate.Stop() s.end <- true return case <-timerCh: @@ -1566,7 +1561,7 @@ func (s *Sentinel) clusterSentinelCheck(pctx context.Context) { defer s.updateMutex.Unlock() e := s.e - cd, prevCDPair, err := e.GetClusterData() + cd, prevCDPair, err := e.GetClusterData(pctx) if err != nil { log.Errorw("error retrieving cluster data", zap.Error(err)) return @@ -1599,27 +1594,25 @@ func (s *Sentinel) clusterSentinelCheck(pctx context.Context) { log.Infow("writing initial cluster data") newcd := cluster.NewClusterData(c) log.Debugf("newcd dump: %s", spew.Sdump(newcd)) - if _, err = e.AtomicPutClusterData(newcd, nil); err != nil { + if _, err = e.AtomicPutClusterData(pctx, newcd, nil); err != nil { log.Errorw("error saving cluster data", zap.Error(err)) } return } - if err = s.setSentinelInfo(2 * s.sleepInterval); err != nil { + if err = s.setSentinelInfo(pctx, 2*s.sleepInterval); err != nil { log.Errorw("cannot update sentinel info", zap.Error(err)) return } - ctx, cancel := context.WithTimeout(pctx, s.requestTimeout) - keepersInfo, err := s.getKeepersInfo(ctx) - cancel() + keepersInfo, err := s.e.GetKeepersInfo(pctx) if err != nil { log.Errorw("cannot get keepers info", zap.Error(err)) return } log.Debugf("keepersInfo dump: %s", spew.Sdump(keepersInfo)) - proxiesInfo, err := s.e.GetProxiesInfo() + proxiesInfo, err := s.e.GetProxiesInfo(pctx) if err != nil { log.Errorw("failed to get proxies info", zap.Error(err)) return @@ -1661,7 +1654,7 @@ func (s *Sentinel) clusterSentinelCheck(pctx context.Context) { log.Debugf("newcd dump after updateCluster: %s", spew.Sdump(newcd)) if newcd != nil { - if _, err := e.AtomicPutClusterData(newcd, prevCDPair); err != nil { + if _, err := e.AtomicPutClusterData(pctx, newcd, prevCDPair); err != nil { log.Errorw("error saving clusterdata", zap.Error(err)) } } diff --git a/cmd/stolonctl/clusterdata.go b/cmd/stolonctl/clusterdata.go index cdeb38d..c786fbb 100644 --- a/cmd/stolonctl/clusterdata.go +++ b/cmd/stolonctl/clusterdata.go @@ -39,11 +39,13 @@ func init() { } func clusterdata(cmd *cobra.Command, args []string) { - e, err := NewStore() + kvStore, err := NewKVStore() if err != nil { die("cannot create store: %v", err) } + e := NewStore(kvStore) + cd, _, err := getClusterData(e) if err != nil { die("%v", err) diff --git a/cmd/stolonctl/init.go b/cmd/stolonctl/init.go index a0b1e77..138bcd4 100644 --- a/cmd/stolonctl/init.go +++ b/cmd/stolonctl/init.go @@ -15,6 +15,7 @@ package main import ( + "context" "encoding/json" "io/ioutil" "os" @@ -70,12 +71,14 @@ func initCluster(cmd *cobra.Command, args []string) { } } - e, err := NewStore() + kvStore, err := NewKVStore() if err != nil { die("cannot create store: %v", err) } - cd, _, err := e.GetClusterData() + e := NewStore(kvStore) + + cd, _, err := e.GetClusterData(context.TODO()) if err != nil { die("cannot get cluster data: %v", err) } @@ -96,7 +99,7 @@ func initCluster(cmd *cobra.Command, args []string) { os.Exit(0) } - cd, _, err = e.GetClusterData() + cd, _, err = e.GetClusterData(context.TODO()) if err != nil { die("cannot get cluster data: %v", err) } @@ -120,7 +123,7 @@ func initCluster(cmd *cobra.Command, args []string) { cd = cluster.NewClusterData(c) // We ignore if cd has been modified between reading and writing - if err := e.PutClusterData(cd); err != nil { + if err := e.PutClusterData(context.TODO(), cd); err != nil { die("cannot update cluster data: %v", err) } } diff --git a/cmd/stolonctl/promote.go b/cmd/stolonctl/promote.go index 9dbe7f5..5bc73c1 100644 --- a/cmd/stolonctl/promote.go +++ b/cmd/stolonctl/promote.go @@ -15,6 +15,7 @@ package main import ( + "context" "os" "github.com/sorintlab/stolon/pkg/cluster" @@ -40,11 +41,13 @@ func promote(cmd *cobra.Command, args []string) { die("too many arguments") } - e, err := NewStore() + kvStore, err := NewKVStore() if err != nil { die("cannot create store: %v", err) } + e := NewStore(kvStore) + accepted := true if !initOpts.forceYes { accepted, err = askConfirmation("Are you sure you want to continue? [yes/no] ") @@ -82,7 +85,7 @@ func promote(cmd *cobra.Command, args []string) { } // retry if cd has been modified between reading and writing - _, err = e.AtomicPutClusterData(cd, pair) + _, err = e.AtomicPutClusterData(context.TODO(), cd, pair) if err != nil { if err == libkvstore.ErrKeyModified { retry++ diff --git a/cmd/stolonctl/removekeeper.go b/cmd/stolonctl/removekeeper.go index ec00e05..9c1fd2e 100644 --- a/cmd/stolonctl/removekeeper.go +++ b/cmd/stolonctl/removekeeper.go @@ -15,6 +15,8 @@ package main import ( + "context" + "github.com/sorintlab/stolon/pkg/cluster" "github.com/spf13/cobra" ) @@ -40,11 +42,13 @@ func removeKeeper(cmd *cobra.Command, args []string) { keeperID := args[0] - store, err := NewStore() + kvStore, err := NewKVStore() if err != nil { die("cannot create store: %v", err) } + store := NewStore(kvStore) + cd, pair, err := getClusterData(store) if err != nil { die("cannot get cluster data: %v", err) @@ -78,7 +82,7 @@ func removeKeeper(cmd *cobra.Command, args []string) { // NOTE: if the removed db is listed inside another db.Followers it'll will // be cleaned by the sentinels - _, err = store.AtomicPutClusterData(newCd, pair) + _, err = store.AtomicPutClusterData(context.TODO(), newCd, pair) if err != nil { die("cannot update cluster data: %v", err) } diff --git a/cmd/stolonctl/spec.go b/cmd/stolonctl/spec.go index 48a2eca..d035476 100644 --- a/cmd/stolonctl/spec.go +++ b/cmd/stolonctl/spec.go @@ -39,11 +39,13 @@ func init() { } func spec(cmd *cobra.Command, args []string) { - e, err := NewStore() + kvStore, err := NewKVStore() if err != nil { die("cannot create store: %v", err) } + e := NewStore(kvStore) + cd, _, err := getClusterData(e) if err != nil { die("%v", err) diff --git a/cmd/stolonctl/status.go b/cmd/stolonctl/status.go index 934f524..24a8fbf 100644 --- a/cmd/stolonctl/status.go +++ b/cmd/stolonctl/status.go @@ -15,12 +15,14 @@ package main import ( + "context" "fmt" "os" "sort" "text/tabwriter" "github.com/sorintlab/stolon/pkg/cluster" + "github.com/sorintlab/stolon/pkg/store" "github.com/spf13/cobra" ) @@ -84,19 +86,22 @@ func status(cmd *cobra.Command, args []string) { tabOut := new(tabwriter.Writer) tabOut.Init(os.Stdout, 0, 8, 1, '\t', 0) - e, err := NewStore() + kvStore, err := NewKVStore() if err != nil { die("cannot create store: %v", err) } - sentinelsInfo, err := e.GetSentinelsInfo() + e := NewStore(kvStore) + + sentinelsInfo, err := e.GetSentinelsInfo(context.TODO()) if err != nil { die("cannot get sentinels info: %v", err) } - lsid, err := e.GetLeaderSentinelId() - if err != nil { - die("cannot get leader sentinel info") + election := NewElection(kvStore) + lsid, err := election.Leader() + if err != nil && err != store.ErrElectionNoLeader { + die("cannot get leader sentinel info: %v", err) } stdout("=== Active sentinels ===") @@ -118,7 +123,7 @@ func status(cmd *cobra.Command, args []string) { } } - proxiesInfo, err := e.GetProxiesInfo() + proxiesInfo, err := e.GetProxiesInfo(context.TODO()) if err != nil { die("cannot get proxies info: %v", err) } diff --git a/cmd/stolonctl/stolonctl.go b/cmd/stolonctl/stolonctl.go index 0260e4a..45e598a 100644 --- a/cmd/stolonctl/stolonctl.go +++ b/cmd/stolonctl/stolonctl.go @@ -16,6 +16,7 @@ package main import ( "bufio" + "context" "fmt" "os" "path/filepath" @@ -27,7 +28,6 @@ import ( "github.com/sorintlab/stolon/pkg/flagutil" "github.com/sorintlab/stolon/pkg/store" - kvstore "github.com/docker/libkv/store" "github.com/spf13/cobra" ) @@ -95,10 +95,8 @@ func die(format string, a ...interface{}) { os.Exit(1) } -func NewStore() (*store.StoreManager, error) { - storePath := filepath.Join(common.StoreBasePath, cfg.ClusterName) - - kvstore, err := store.NewStore(store.Config{ +func NewKVStore() (store.KVStore, error) { + return store.NewKVStore(store.Config{ Backend: store.Backend(cfg.StoreBackend), Endpoints: cfg.StoreEndpoints, CertFile: cfg.StoreCertFile, @@ -106,14 +104,20 @@ func NewStore() (*store.StoreManager, error) { CAFile: cfg.StoreCAFile, SkipTLSVerify: cfg.StoreSkipTlsVerify, }) - if err != nil { - return nil, fmt.Errorf("cannot create store: %v", err) - } - return store.NewStoreManager(kvstore, storePath), nil } -func getClusterData(e *store.StoreManager) (*cluster.ClusterData, *kvstore.KVPair, error) { - cd, pair, err := e.GetClusterData() +func NewStore(kvStore store.KVStore) *store.Store { + storePath := filepath.Join(common.StoreBasePath, cfg.ClusterName) + return store.NewStore(kvStore, storePath) +} + +func NewElection(kvStore store.KVStore) store.Election { + storePath := filepath.Join(common.StoreBasePath, cfg.ClusterName) + return store.NewElection(kvStore, filepath.Join(storePath, common.SentinelLeaderKey), "") +} + +func getClusterData(e *store.Store) (*cluster.ClusterData, *store.KVPair, error) { + cd, pair, err := e.GetClusterData(context.TODO()) if err != nil { return nil, nil, fmt.Errorf("cannot get cluster data: %v", err) } diff --git a/cmd/stolonctl/update.go b/cmd/stolonctl/update.go index 066be40..f603fd7 100644 --- a/cmd/stolonctl/update.go +++ b/cmd/stolonctl/update.go @@ -15,6 +15,7 @@ package main import ( + "context" "encoding/json" "fmt" "io/ioutil" @@ -93,11 +94,13 @@ func update(cmd *cobra.Command, args []string) { } } - e, err := NewStore() + kvStore, err := NewKVStore() if err != nil { die("cannot create store: %v", err) } + e := NewStore(kvStore) + retry := 0 for retry < maxRetries { cd, pair, err := getClusterData(e) @@ -127,7 +130,7 @@ func update(cmd *cobra.Command, args []string) { } // retry if cd has been modified between reading and writing - _, err = e.AtomicPutClusterData(cd, pair) + _, err = e.AtomicPutClusterData(context.TODO(), cd, pair) if err != nil { if err == libkvstore.ErrKeyModified { retry++ diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index b0d6ad5..5a3e06e 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -43,6 +43,7 @@ const ( ) const ( + DefaultStoreTimeout = 5 * time.Second DefaultProxyCheckInterval = 5 * time.Second DefaultProxyTimeoutInterval = 15 * time.Second diff --git a/pkg/store/etcdv3.go b/pkg/store/etcdv3.go new file mode 100644 index 0000000..135782f --- /dev/null +++ b/pkg/store/etcdv3.go @@ -0,0 +1,226 @@ +// Copyright 2017 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package store + +import ( + "context" + "time" + + etcdclientv3 "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/clientv3/concurrency" + "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" +) + +func fromEtcV3Error(err error) error { + switch err { + case rpctypes.ErrKeyNotFound: + return ErrKeyNotFound + case concurrency.ErrElectionNoLeader: + return ErrElectionNoLeader + } + return err +} + +type etcdV3Store struct { + c *etcdclientv3.Client + requestTimeout time.Duration +} + +func (s *etcdV3Store) Put(pctx context.Context, key string, value []byte, options *WriteOptions) error { + etcdv3Options := []etcdclientv3.OpOption{} + if options != nil { + if options.TTL > 0 { + ctx, cancel := context.WithTimeout(pctx, s.requestTimeout) + lease, err := s.c.Grant(ctx, int64(options.TTL.Seconds())) + cancel() + if err != nil { + return err + } + etcdv3Options = append(etcdv3Options, etcdclientv3.WithLease(lease.ID)) + } + } + ctx, cancel := context.WithTimeout(pctx, s.requestTimeout) + _, err := s.c.Put(ctx, key, string(value), etcdv3Options...) + cancel() + return fromLibKVStoreErr(err) +} + +func (s *etcdV3Store) Get(pctx context.Context, key string) (*KVPair, error) { + ctx, cancel := context.WithTimeout(pctx, s.requestTimeout) + resp, err := s.c.Get(ctx, key) + cancel() + if err != nil { + return nil, fromEtcV3Error(err) + } + if len(resp.Kvs) == 0 { + return nil, ErrKeyNotFound + } + kv := resp.Kvs[0] + return &KVPair{Key: string(kv.Key), Value: kv.Value, LastIndex: uint64(kv.ModRevision)}, nil +} + +func (s *etcdV3Store) List(pctx context.Context, directory string) ([]*KVPair, error) { + ctx, cancel := context.WithTimeout(pctx, s.requestTimeout) + resp, err := s.c.Get(ctx, directory, etcdclientv3.WithPrefix()) + cancel() + if err != nil { + return nil, fromEtcV3Error(err) + } + kvPairs := make([]*KVPair, len(resp.Kvs)) + for i, kv := range resp.Kvs { + kvPairs[i] = &KVPair{Key: string(kv.Key), Value: kv.Value, LastIndex: uint64(kv.ModRevision)} + } + return kvPairs, nil +} + +func (s *etcdV3Store) AtomicPut(pctx context.Context, key string, value []byte, previous *KVPair, options *WriteOptions) (*KVPair, error) { + etcdv3Options := []etcdclientv3.OpOption{} + if options != nil { + if options.TTL > 0 { + ctx, cancel := context.WithTimeout(pctx, s.requestTimeout) + lease, err := s.c.Grant(ctx, int64(options.TTL)) + cancel() + if err != nil { + return nil, err + } + etcdv3Options = append(etcdv3Options, etcdclientv3.WithLease(lease.ID)) + } + } + var cmp etcdclientv3.Cmp + if previous != nil { + cmp = etcdclientv3.Compare(etcdclientv3.ModRevision(key), "=", int64(previous.LastIndex)) + } else { + // key doens't exists + cmp = etcdclientv3.Compare(etcdclientv3.CreateRevision(key), "=", 0) + } + ctx, cancel := context.WithTimeout(pctx, s.requestTimeout) + txn := s.c.Txn(ctx).If(cmp) + txn = txn.Then(etcdclientv3.OpPut(key, string(value), etcdv3Options...)) + tresp, err := txn.Commit() + cancel() + if err != nil { + return nil, fromEtcV3Error(err) + } + if !tresp.Succeeded { + return nil, ErrKeyModified + } + revision := tresp.Responses[0].GetResponsePut().Header.Revision + return &KVPair{Key: key, Value: value, LastIndex: uint64(revision)}, nil +} + +func (s *etcdV3Store) Delete(pctx context.Context, key string) error { + ctx, cancel := context.WithTimeout(pctx, s.requestTimeout) + _, err := s.c.Delete(ctx, key) + cancel() + return fromEtcV3Error(err) +} + +func (s *etcdV3Store) Close() error { + return s.c.Close() +} + +type etcdv3Election struct { + c *etcdclientv3.Client + path string + candidateUID string + ttl time.Duration + + requestTimeout time.Duration + + running bool + + electedCh chan bool + errCh chan error + + ctx context.Context + cancel context.CancelFunc +} + +func (e *etcdv3Election) RunForElection() (<-chan bool, <-chan error) { + if e.running { + panic("already running") + } + + e.electedCh = make(chan bool) + e.errCh = make(chan error) + e.ctx, e.cancel = context.WithCancel(context.Background()) + + e.running = true + go e.campaign() + + return e.electedCh, e.errCh +} + +func (e *etcdv3Election) Stop() { + if !e.running { + panic("not running") + } + e.cancel() + e.running = false +} + +func (e *etcdv3Election) Leader() (string, error) { + s, err := concurrency.NewSession(e.c, concurrency.WithTTL(int(e.ttl.Seconds()))) + if err != nil { + return "", fromEtcV3Error(err) + } + defer s.Close() + + etcdElection := concurrency.NewElection(s, e.path) + + ctx, cancel := context.WithTimeout(context.Background(), e.requestTimeout) + resp, err := etcdElection.Leader(ctx) + cancel() + if err != nil { + return "", fromEtcV3Error(err) + } + leader := string(resp.Kvs[0].Value) + + return leader, nil +} + +func (e *etcdv3Election) campaign() { + defer close(e.electedCh) + defer close(e.errCh) + + for { + e.electedCh <- false + s, err := concurrency.NewSession(e.c, concurrency.WithTTL(int(e.ttl.Seconds())), concurrency.WithContext(e.ctx)) + if err != nil { + e.running = false + e.errCh <- err + return + } + + etcdElection := concurrency.NewElection(s, e.path) + if err = etcdElection.Campaign(e.ctx, e.candidateUID); err != nil { + e.running = false + e.errCh <- err + return + } + + e.electedCh <- true + + select { + case <-e.ctx.Done(): + e.running = false + etcdElection.Resign(context.TODO()) + return + case <-s.Done(): + etcdElection.Resign(context.TODO()) + e.electedCh <- false + } + } +} diff --git a/pkg/store/libkv.go b/pkg/store/libkv.go new file mode 100644 index 0000000..1df2493 --- /dev/null +++ b/pkg/store/libkv.go @@ -0,0 +1,120 @@ +// Copyright 2017 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package store + +import ( + "context" + + "github.com/docker/leadership" + libkvstore "github.com/docker/libkv/store" + "github.com/docker/libkv/store/consul" + "github.com/docker/libkv/store/etcd" +) + +func init() { + etcd.Register() + consul.Register() +} + +func fromLibKVStoreErr(err error) error { + switch err { + case libkvstore.ErrKeyNotFound: + return ErrKeyNotFound + } + return err +} + +type libKVStore struct { + store libkvstore.Store +} + +func (s *libKVStore) Put(ctx context.Context, key string, value []byte, options *WriteOptions) error { + var libkvOptions *libkvstore.WriteOptions + if options != nil { + libkvOptions = &libkvstore.WriteOptions{TTL: options.TTL} + } + err := s.store.Put(key, value, libkvOptions) + return fromLibKVStoreErr(err) +} + +func (s *libKVStore) Get(ctx context.Context, key string) (*KVPair, error) { + pair, err := s.store.Get(key) + if err != nil { + return nil, fromLibKVStoreErr(err) + } + return &KVPair{Key: pair.Key, Value: pair.Value, LastIndex: pair.LastIndex}, nil +} + +func (s *libKVStore) List(ctx context.Context, directory string) ([]*KVPair, error) { + pairs, err := s.store.List(directory) + if err != nil { + return nil, fromLibKVStoreErr(err) + } + kvPairs := make([]*KVPair, len(pairs)) + for i, p := range pairs { + kvPairs[i] = &KVPair{Key: p.Key, Value: p.Value, LastIndex: p.LastIndex} + } + return kvPairs, nil +} + +func (s *libKVStore) AtomicPut(ctx context.Context, key string, value []byte, previous *KVPair, options *WriteOptions) (*KVPair, error) { + var libkvPrevious *libkvstore.KVPair + if previous != nil { + libkvPrevious = &libkvstore.KVPair{Key: previous.Key, LastIndex: previous.LastIndex} + } + var libkvOptions *libkvstore.WriteOptions + if options != nil { + libkvOptions = &libkvstore.WriteOptions{TTL: options.TTL} + } + _, pair, err := s.store.AtomicPut(key, value, libkvPrevious, libkvOptions) + if err != nil { + return nil, fromLibKVStoreErr(err) + } + return &KVPair{Key: pair.Key, Value: pair.Value, LastIndex: pair.LastIndex}, nil +} + +func (s *libKVStore) Delete(ctx context.Context, key string) error { + return fromLibKVStoreErr(s.store.Delete(key)) +} + +func (s *libKVStore) Close() error { + s.store.Close() + return nil +} + +type libkvElection struct { + store *libKVStore + path string + candidate *leadership.Candidate +} + +func (e *libkvElection) RunForElection() (<-chan bool, <-chan error) { + return e.candidate.RunForElection() +} + +func (e *libkvElection) Stop() { + e.candidate.Stop() +} + +func (e *libkvElection) Leader() (string, error) { + pair, err := e.store.Get(context.TODO(), e.path) + if err != nil { + if err != ErrKeyNotFound { + return "", err + } + return "", nil + } + return string(pair.Value), nil +} diff --git a/pkg/store/store.go b/pkg/store/store.go index 77aeb27..b0419f2 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -15,8 +15,10 @@ package store import ( + "context" "crypto/tls" "encoding/json" + "errors" "fmt" "net/url" "path/filepath" @@ -27,23 +29,19 @@ import ( "github.com/sorintlab/stolon/common" "github.com/sorintlab/stolon/pkg/cluster" + etcdclientv3 "github.com/coreos/etcd/clientv3" + "github.com/docker/leadership" "github.com/docker/libkv" - kvstore "github.com/docker/libkv/store" - "github.com/docker/libkv/store/consul" - "github.com/docker/libkv/store/etcd" + libkvstore "github.com/docker/libkv/store" ) -func init() { - etcd.Register() - consul.Register() -} - // Backend represents a KV Store Backend type Backend string const ( CONSUL Backend = "consul" - ETCD Backend = "etcd" + ETCDV2 Backend = "etcdv2" + ETCDV3 Backend = "etcdv3" ) const ( @@ -61,6 +59,13 @@ const ( DefaultConsulEndpoints = "http://127.0.0.1:8500" ) +var ( + // ErrKeyNotFound is thrown when the key is not found in the store during a Get operation + ErrKeyNotFound = errors.New("Key not found in store") + ErrKeyModified = errors.New("Unable to complete atomic operation, key modified") + ErrElectionNoLeader = errors.New("election: no leader") +) + const ( //TODO(sgotti) fix this in libkv? // consul min ttl is 10s and libkv divides this by 2 @@ -78,18 +83,45 @@ type Config struct { SkipTLSVerify bool } -type StoreManager struct { - clusterPath string - store kvstore.Store +// KVPair represents {Key, Value, Lastindex} tuple +type KVPair struct { + Key string + Value []byte + LastIndex uint64 } -func NewStore(cfg Config) (kvstore.Store, error) { - var kvBackend kvstore.Backend +type WriteOptions struct { + TTL time.Duration +} + +type KVStore interface { + // Put a value at the specified key + Put(ctx context.Context, key string, value []byte, options *WriteOptions) error + + // Get a value given its key + Get(ctx context.Context, key string) (*KVPair, error) + + // List the content of a given prefix + List(ctx context.Context, directory string) ([]*KVPair, error) + + // Atomic CAS operation on a single value. + // Pass previous = nil to create a new key. + AtomicPut(ctx context.Context, key string, value []byte, previous *KVPair, options *WriteOptions) (*KVPair, error) + + Delete(ctx context.Context, key string) error + + // Close the store connection + Close() error +} + +func NewKVStore(cfg Config) (KVStore, error) { + var kvBackend libkvstore.Backend switch cfg.Backend { case CONSUL: - kvBackend = kvstore.CONSUL - case ETCD: - kvBackend = kvstore.ETCD + kvBackend = libkvstore.CONSUL + case ETCDV2: + kvBackend = libkvstore.ETCD + case ETCDV3: default: return nil, fmt.Errorf("Unknown store backend: %q", cfg.Backend) } @@ -99,7 +131,7 @@ func NewStore(cfg Config) (kvstore.Store, error) { switch cfg.Backend { case CONSUL: endpointsStr = DefaultConsulEndpoints - case ETCD: + case ETCDV2, ETCDV3: endpointsStr = DefaultEtcdEndpoints } } @@ -147,59 +179,108 @@ func NewStore(cfg Config) (kvstore.Store, error) { } } - config := &kvstore.Config{ - TLS: tlsConfig, - ConnectionTimeout: 10 * time.Second, - } + switch cfg.Backend { + case CONSUL, ETCDV2: + config := &libkvstore.Config{ + TLS: tlsConfig, + ConnectionTimeout: cluster.DefaultStoreTimeout, + } - store, err := libkv.NewStore(kvBackend, addrs, config) - if err != nil { - return nil, err + store, err := libkv.NewStore(kvBackend, addrs, config) + if err != nil { + return nil, err + } + return &libKVStore{store: store}, nil + case ETCDV3: + config := etcdclientv3.Config{ + Endpoints: addrs, + TLS: tlsConfig, + } + + c, err := etcdclientv3.New(config) + if err != nil { + return nil, err + } + return &etcdV3Store{c: c, requestTimeout: cluster.DefaultStoreTimeout}, nil + default: + return nil, fmt.Errorf("Unknown store backend: %q", cfg.Backend) } - return store, nil } -func NewStoreManager(kvStore kvstore.Store, path string) *StoreManager { - return &StoreManager{ +type Election interface { + // TODO(sgotti) this mimics the current docker/leadership API and the etcdv3 + // implementations adapt to it. In future it could be replaced with a better + // api like the current one implemented by etcdclientv3/concurrency. + RunForElection() (<-chan bool, <-chan error) + Leader() (string, error) + Stop() +} + +func NewElection(kvStore KVStore, path, candidateUID string) Election { + switch kvStore.(type) { + case *libKVStore: + s := kvStore.(*libKVStore) + electionPath := filepath.Join(path, common.SentinelLeaderKey) + candidate := leadership.NewCandidate(s.store, electionPath, candidateUID, MinTTL) + return &libkvElection{store: s, path: electionPath, candidate: candidate} + case *etcdV3Store: + etcdV3Store := kvStore.(*etcdV3Store) + return &etcdv3Election{ + c: etcdV3Store.c, + path: path, + candidateUID: candidateUID, + ttl: MinTTL, + } + default: + panic("unknown kvstore") + } +} + +type Store struct { + clusterPath string + store KVStore +} + +func NewStore(kvStore KVStore, path string) *Store { + return &Store{ clusterPath: path, store: kvStore, } } -func (e *StoreManager) AtomicPutClusterData(cd *cluster.ClusterData, previous *kvstore.KVPair) (*kvstore.KVPair, error) { +func (s *Store) AtomicPutClusterData(ctx context.Context, cd *cluster.ClusterData, previous *KVPair) (*KVPair, error) { cdj, err := json.Marshal(cd) if err != nil { return nil, err } - path := filepath.Join(e.clusterPath, clusterDataFile) + path := filepath.Join(s.clusterPath, clusterDataFile) // Skip prev Value since LastIndex is enough for a CAS and it gives // problem with etcd v2 api with big prev values. - var prev *kvstore.KVPair + var prev *KVPair if previous != nil { - prev = &kvstore.KVPair{ + prev = &KVPair{ Key: previous.Key, LastIndex: previous.LastIndex, } } - _, pair, err := e.store.AtomicPut(path, cdj, prev, nil) - return pair, err + return s.store.AtomicPut(ctx, path, cdj, prev, nil) } -func (e *StoreManager) PutClusterData(cd *cluster.ClusterData) error { +func (s *Store) PutClusterData(ctx context.Context, cd *cluster.ClusterData) error { cdj, err := json.Marshal(cd) if err != nil { return err } - path := filepath.Join(e.clusterPath, clusterDataFile) - return e.store.Put(path, cdj, nil) + path := filepath.Join(s.clusterPath, clusterDataFile) + return s.store.Put(ctx, path, cdj, nil) } -func (e *StoreManager) GetClusterData() (*cluster.ClusterData, *kvstore.KVPair, error) { +func (s *Store) GetClusterData(ctx context.Context) (*cluster.ClusterData, *KVPair, error) { var cd *cluster.ClusterData - path := filepath.Join(e.clusterPath, clusterDataFile) - pair, err := e.store.Get(path) + path := filepath.Join(s.clusterPath, clusterDataFile) + pair, err := s.store.Get(ctx, path) if err != nil { - if err != kvstore.ErrKeyNotFound { + if err != ErrKeyNotFound { return nil, nil, err } return nil, nil, nil @@ -210,7 +291,7 @@ func (e *StoreManager) GetClusterData() (*cluster.ClusterData, *kvstore.KVPair, return cd, pair, nil } -func (e *StoreManager) SetKeeperInfo(id string, ms *cluster.KeeperInfo, ttl time.Duration) error { +func (s *Store) SetKeeperInfo(ctx context.Context, id string, ms *cluster.KeeperInfo, ttl time.Duration) error { msj, err := json.Marshal(ms) if err != nil { return err @@ -218,17 +299,17 @@ func (e *StoreManager) SetKeeperInfo(id string, ms *cluster.KeeperInfo, ttl time if ttl < MinTTL { ttl = MinTTL } - return e.store.Put(filepath.Join(e.clusterPath, keepersInfoDir, id), msj, &kvstore.WriteOptions{TTL: ttl}) + return s.store.Put(ctx, filepath.Join(s.clusterPath, keepersInfoDir, id), msj, &WriteOptions{TTL: ttl}) } -func (e *StoreManager) GetKeeperInfo(id string) (*cluster.KeeperInfo, bool, error) { +func (s *Store) GetKeeperInfo(ctx context.Context, id string) (*cluster.KeeperInfo, bool, error) { if id == "" { return nil, false, fmt.Errorf("empty keeper id") } var keeper cluster.KeeperInfo - pair, err := e.store.Get(filepath.Join(e.clusterPath, keepersInfoDir, id)) + pair, err := s.store.Get(ctx, filepath.Join(s.clusterPath, keepersInfoDir, id)) if err != nil { - if err != kvstore.ErrKeyNotFound { + if err != ErrKeyNotFound { return nil, false, err } return nil, false, nil @@ -239,11 +320,11 @@ func (e *StoreManager) GetKeeperInfo(id string) (*cluster.KeeperInfo, bool, erro return &keeper, true, nil } -func (e *StoreManager) GetKeepersInfo() (cluster.KeepersInfo, error) { +func (s *Store) GetKeepersInfo(ctx context.Context) (cluster.KeepersInfo, error) { keepers := cluster.KeepersInfo{} - pairs, err := e.store.List(filepath.Join(e.clusterPath, keepersInfoDir)) + pairs, err := s.store.List(ctx, filepath.Join(s.clusterPath, keepersInfoDir)) if err != nil { - if err != kvstore.ErrKeyNotFound { + if err != ErrKeyNotFound { return nil, err } return keepers, nil @@ -259,7 +340,7 @@ func (e *StoreManager) GetKeepersInfo() (cluster.KeepersInfo, error) { return keepers, nil } -func (e *StoreManager) SetSentinelInfo(si *cluster.SentinelInfo, ttl time.Duration) error { +func (s *Store) SetSentinelInfo(ctx context.Context, si *cluster.SentinelInfo, ttl time.Duration) error { sij, err := json.Marshal(si) if err != nil { return err @@ -267,17 +348,17 @@ func (e *StoreManager) SetSentinelInfo(si *cluster.SentinelInfo, ttl time.Durati if ttl < MinTTL { ttl = MinTTL } - return e.store.Put(filepath.Join(e.clusterPath, sentinelsInfoDir, si.UID), sij, &kvstore.WriteOptions{TTL: ttl}) + return s.store.Put(ctx, filepath.Join(s.clusterPath, sentinelsInfoDir, si.UID), sij, &WriteOptions{TTL: ttl}) } -func (e *StoreManager) GetSentinelInfo(id string) (*cluster.SentinelInfo, bool, error) { +func (s *Store) GetSentinelInfo(ctx context.Context, id string) (*cluster.SentinelInfo, bool, error) { if id == "" { return nil, false, fmt.Errorf("empty sentinel id") } var si cluster.SentinelInfo - pair, err := e.store.Get(filepath.Join(e.clusterPath, sentinelsInfoDir, id)) + pair, err := s.store.Get(ctx, filepath.Join(s.clusterPath, sentinelsInfoDir, id)) if err != nil { - if err != kvstore.ErrKeyNotFound { + if err != ErrKeyNotFound { return nil, false, err } return nil, false, nil @@ -289,11 +370,11 @@ func (e *StoreManager) GetSentinelInfo(id string) (*cluster.SentinelInfo, bool, return &si, true, nil } -func (e *StoreManager) GetSentinelsInfo() (cluster.SentinelsInfo, error) { +func (s *Store) GetSentinelsInfo(ctx context.Context) (cluster.SentinelsInfo, error) { ssi := cluster.SentinelsInfo{} - pairs, err := e.store.List(filepath.Join(e.clusterPath, sentinelsInfoDir)) + pairs, err := s.store.List(ctx, filepath.Join(s.clusterPath, sentinelsInfoDir)) if err != nil { - if err != kvstore.ErrKeyNotFound { + if err != ErrKeyNotFound { return nil, err } return ssi, nil @@ -309,10 +390,10 @@ func (e *StoreManager) GetSentinelsInfo() (cluster.SentinelsInfo, error) { return ssi, nil } -func (e *StoreManager) GetLeaderSentinelId() (string, error) { - pair, err := e.store.Get(filepath.Join(e.clusterPath, sentinelLeaderKey)) +func (s *Store) GetLeaderSentinelId(ctx context.Context) (string, error) { + pair, err := s.store.Get(ctx, filepath.Join(s.clusterPath, sentinelLeaderKey)) if err != nil { - if err != kvstore.ErrKeyNotFound { + if err != ErrKeyNotFound { return "", err } return "", nil @@ -320,7 +401,7 @@ func (e *StoreManager) GetLeaderSentinelId() (string, error) { return string(pair.Value), nil } -func (e *StoreManager) SetProxyInfo(pi *cluster.ProxyInfo, ttl time.Duration) error { +func (s *Store) SetProxyInfo(ctx context.Context, pi *cluster.ProxyInfo, ttl time.Duration) error { pij, err := json.Marshal(pi) if err != nil { return err @@ -328,17 +409,17 @@ func (e *StoreManager) SetProxyInfo(pi *cluster.ProxyInfo, ttl time.Duration) er if ttl < MinTTL { ttl = MinTTL } - return e.store.Put(filepath.Join(e.clusterPath, proxiesInfoDir, pi.UID), pij, &kvstore.WriteOptions{TTL: ttl}) + return s.store.Put(ctx, filepath.Join(s.clusterPath, proxiesInfoDir, pi.UID), pij, &WriteOptions{TTL: ttl}) } -func (e *StoreManager) GetProxyInfo(id string) (*cluster.ProxyInfo, bool, error) { +func (s *Store) GetProxyInfo(ctx context.Context, id string) (*cluster.ProxyInfo, bool, error) { if id == "" { return nil, false, fmt.Errorf("empty proxy id") } var pi cluster.ProxyInfo - pair, err := e.store.Get(filepath.Join(e.clusterPath, proxiesInfoDir, id)) + pair, err := s.store.Get(ctx, filepath.Join(s.clusterPath, proxiesInfoDir, id)) if err != nil { - if err != kvstore.ErrKeyNotFound { + if err != ErrKeyNotFound { return nil, false, err } return nil, false, nil @@ -350,11 +431,11 @@ func (e *StoreManager) GetProxyInfo(id string) (*cluster.ProxyInfo, bool, error) return &pi, true, nil } -func (e *StoreManager) GetProxiesInfo() (cluster.ProxiesInfo, error) { +func (s *Store) GetProxiesInfo(ctx context.Context) (cluster.ProxiesInfo, error) { psi := cluster.ProxiesInfo{} - pairs, err := e.store.List(filepath.Join(e.clusterPath, proxiesInfoDir)) + pairs, err := s.store.List(ctx, filepath.Join(s.clusterPath, proxiesInfoDir)) if err != nil { - if err != kvstore.ErrKeyNotFound { + if err != ErrKeyNotFound { return nil, err } return psi, nil diff --git a/scripts/semaphore.sh b/scripts/semaphore.sh index d6cad8e..22ac819 100755 --- a/scripts/semaphore.sh +++ b/scripts/semaphore.sh @@ -12,8 +12,8 @@ fi # Install etcd mkdir etcd pushd etcd -curl -L https://github.com/coreos/etcd/releases/download/v3.1.8/etcd-v3.1.8-linux-amd64.tar.gz -o etcd-v3.1.8-linux-amd64.tar.gz -tar xzvf etcd-v3.1.8-linux-amd64.tar.gz +curl -L https://github.com/coreos/etcd/releases/download/v3.2.11/etcd-v3.2.11-linux-amd64.tar.gz -o etcd-v3.2.11-linux-amd64.tar.gz +tar xzvf etcd-v3.2.11-linux-amd64.tar.gz popd # Install consul @@ -34,7 +34,7 @@ sudo apt-get -y install postgresql-9.5 postgresql-9.6 postgresql-10 sudo -E CGO_ENABLED=0 go install -a -installsuffix cgo std # Run tests -export ETCD_BIN="${PWD}/etcd/etcd-v3.1.8-linux-amd64/etcd" +export ETCD_BIN="${PWD}/etcd/etcd-v3.2.11-linux-amd64/etcd" export CONSUL_BIN="${PWD}/consul/consul" OLDPATH=$PATH diff --git a/test b/test index bd4dc74..edf21d6 100755 --- a/test +++ b/test @@ -97,7 +97,7 @@ if [ -n "$INTEGRATION" ]; then export STSENTINEL_BIN=${BINDIR}/stolon-sentinel export STPROXY_BIN=${BINDIR}/stolon-proxy export STCTL_BIN=${BINDIR}/stolonctl - if [ "${STOLON_TEST_STORE_BACKEND}" == "etcd" ]; then + if [ "${STOLON_TEST_STORE_BACKEND}" == "etcd" -o "${STOLON_TEST_STORE_BACKEND}" == "etcdv2" -o "${STOLON_TEST_STORE_BACKEND}" == "etcdv3" ]; then if [ -z ${ETCD_BIN} ]; then if [ -z $(which etcd) ]; then echo "cannot find etcd in PATH and ETCD_BIN environment variable not defined" diff --git a/tests/integration/config_test.go b/tests/integration/config_test.go index 6c9cb3e..7995c6f 100644 --- a/tests/integration/config_test.go +++ b/tests/integration/config_test.go @@ -55,7 +55,7 @@ func TestServerParameters(t *testing.T) { storePath := filepath.Join(common.StoreBasePath, clusterName) - sm := store.NewStoreManager(tstore.store, storePath) + sm := store.NewStore(tstore.store, storePath) initialClusterSpec := &cluster.ClusterSpec{ InitMode: cluster.ClusterInitModeP(cluster.ClusterInitModeNew), diff --git a/tests/integration/ha_test.go b/tests/integration/ha_test.go index 35e599b..9f835f4 100644 --- a/tests/integration/ha_test.go +++ b/tests/integration/ha_test.go @@ -15,6 +15,7 @@ package integration import ( + "context" "fmt" "io/ioutil" "os" @@ -70,7 +71,7 @@ func TestInitWithMultipleKeepers(t *testing.T) { storePath := filepath.Join(common.StoreBasePath, clusterName) - sm := store.NewStoreManager(tstore.store, storePath) + sm := store.NewStore(tstore.store, storePath) initialClusterSpec := &cluster.ClusterSpec{ InitMode: cluster.ClusterInitModeP(cluster.ClusterInitModeNew), @@ -271,7 +272,7 @@ func shutdown(tks map[string]*TestKeeper, tss map[string]*TestSentinel, tp *Test } } -func waitKeeperReady(t *testing.T, sm *store.StoreManager, keeper *TestKeeper) { +func waitKeeperReady(t *testing.T, sm *store.Store, keeper *TestKeeper) { if err := WaitClusterDataKeeperInitialized(keeper.uid, sm, 60*time.Second); err != nil { t.Fatalf("unexpected err: %v", err) } @@ -280,7 +281,7 @@ func waitKeeperReady(t *testing.T, sm *store.StoreManager, keeper *TestKeeper) { } } -func waitMasterStandbysReady(t *testing.T, sm *store.StoreManager, tks testKeepers) (master *TestKeeper, standbys []*TestKeeper) { +func waitMasterStandbysReady(t *testing.T, sm *store.Store, tks testKeepers) (master *TestKeeper, standbys []*TestKeeper) { // Wait for normal cluster phase (master ready) masterUID, err := WaitClusterDataWithMaster(sm, 60*time.Second) if err != nil { @@ -314,7 +315,7 @@ func testMasterStandby(t *testing.T, syncRepl bool) { defer shutdown(tks, tss, tp, tstore) storePath := filepath.Join(common.StoreBasePath, clusterName) - sm := store.NewStoreManager(tstore.store, storePath) + sm := store.NewStore(tstore.store, storePath) master, standbys := waitMasterStandbysReady(t, sm, tks) standby := standbys[0] @@ -378,7 +379,7 @@ func testFailover(t *testing.T, syncRepl bool, standbyCluster bool) { defer shutdown(tks, tss, tp, tstore) storePath := filepath.Join(common.StoreBasePath, clusterName) - sm := store.NewStoreManager(tstore.store, storePath) + sm := store.NewStore(tstore.store, storePath) master, standbys := waitMasterStandbysReady(t, sm, tks) standby := standbys[0] @@ -486,7 +487,7 @@ func testFailoverFailed(t *testing.T, syncRepl bool, standbyCluster bool) { defer shutdown(tks, tss, tp, tstore) storePath := filepath.Join(common.StoreBasePath, clusterName) - sm := store.NewStoreManager(tstore.store, storePath) + sm := store.NewStore(tstore.store, storePath) master, standbys := waitMasterStandbysReady(t, sm, tks) standby := standbys[0] @@ -585,7 +586,7 @@ func testFailoverTooMuchLag(t *testing.T, standbyCluster bool) { defer shutdown(tks, tss, tp, tstore) storePath := filepath.Join(common.StoreBasePath, clusterName) - sm := store.NewStoreManager(tstore.store, storePath) + sm := store.NewStore(tstore.store, storePath) master, standbys := waitMasterStandbysReady(t, sm, tks) standby := standbys[0] @@ -663,7 +664,7 @@ func testOldMasterRestart(t *testing.T, syncRepl, usePgrewind bool, standbyClust storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port) storePath := filepath.Join(common.StoreBasePath, clusterName) - sm := store.NewStoreManager(tstore.store, storePath) + sm := store.NewStore(tstore.store, storePath) master, standbys := waitMasterStandbysReady(t, sm, tks) @@ -796,7 +797,7 @@ func testPartition1(t *testing.T, syncRepl, usePgrewind bool, standbyCluster boo storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port) storePath := filepath.Join(common.StoreBasePath, clusterName) - sm := store.NewStoreManager(tstore.store, storePath) + sm := store.NewStore(tstore.store, storePath) master, standbys := waitMasterStandbysReady(t, sm, tks) @@ -932,7 +933,7 @@ func testTimelineFork(t *testing.T, syncRepl, usePgrewind bool) { storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port) storePath := filepath.Join(common.StoreBasePath, clusterName) - sm := store.NewStoreManager(tstore.store, storePath) + sm := store.NewStore(tstore.store, storePath) master, standbys := waitMasterStandbysReady(t, sm, tks) @@ -1111,7 +1112,7 @@ func testMasterChangedAddress(t *testing.T, standbyCluster bool) { defer shutdown(tks, tss, tp, tstore) storePath := filepath.Join(common.StoreBasePath, clusterName) - sm := store.NewStoreManager(tstore.store, storePath) + sm := store.NewStore(tstore.store, storePath) master, standbys := waitMasterStandbysReady(t, sm, tks) @@ -1198,7 +1199,7 @@ func TestFailedStandby(t *testing.T) { defer shutdown(tks, tss, tp, tstore) storePath := filepath.Join(common.StoreBasePath, clusterName) - sm := store.NewStoreManager(tstore.store, storePath) + sm := store.NewStore(tstore.store, storePath) // Wait for clusterView containing a master masterUID, err := WaitClusterDataWithMaster(sm, 30*time.Second) @@ -1226,7 +1227,7 @@ func TestFailedStandby(t *testing.T) { t.Fatalf("expected 2 DBs in cluster data: %v", err) } - cd, _, err := sm.GetClusterData() + cd, _, err := sm.GetClusterData(context.TODO()) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -1290,7 +1291,7 @@ func TestLoweredMaxStandbysPerSender(t *testing.T) { storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port) storePath := filepath.Join(common.StoreBasePath, clusterName) - sm := store.NewStoreManager(tstore.store, storePath) + sm := store.NewStore(tstore.store, storePath) // Wait for clusterView containing a master masterUID, err := WaitClusterDataWithMaster(sm, 30*time.Second) @@ -1357,7 +1358,7 @@ func TestKeeperRemoval(t *testing.T) { storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port) storePath := filepath.Join(common.StoreBasePath, clusterName) - sm := store.NewStoreManager(tstore.store, storePath) + sm := store.NewStore(tstore.store, storePath) master, standbys := waitMasterStandbysReady(t, sm, tks) standby1 := standbys[0] @@ -1460,7 +1461,7 @@ func testKeeperRemovalStolonCtl(t *testing.T, syncRepl bool) { storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port) storePath := filepath.Join(common.StoreBasePath, clusterName) - sm := store.NewStoreManager(tstore.store, storePath) + sm := store.NewStore(tstore.store, storePath) master, standbys := waitMasterStandbysReady(t, sm, tks) @@ -1505,7 +1506,7 @@ func testKeeperRemovalStolonCtl(t *testing.T, syncRepl bool) { } // get current stanbdys[0] db uid - cd, _, err := sm.GetClusterData() + cd, _, err := sm.GetClusterData(context.TODO()) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -1566,7 +1567,7 @@ func TestStandbyCantSync(t *testing.T) { defer shutdown(tks, tss, tp, tstore) storePath := filepath.Join(common.StoreBasePath, clusterName) - sm := store.NewStoreManager(tstore.store, storePath) + sm := store.NewStore(tstore.store, storePath) master, standbys := waitMasterStandbysReady(t, sm, tks) @@ -1616,7 +1617,7 @@ func TestStandbyCantSync(t *testing.T) { } // get current stanbdys[0] db uid - cd, _, err := sm.GetClusterData() + cd, _, err := sm.GetClusterData(context.TODO()) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -1645,7 +1646,7 @@ func TestStandbyCantSync(t *testing.T) { // check that the current stanbdys[0] db uid is different. This means the // sentinel found that standbys[0] won't sync due to missing wals and asked // the keeper to resync (defining e new db in the cluster data) - cd, _, err = sm.GetClusterData() + cd, _, err = sm.GetClusterData(context.TODO()) if err != nil { t.Fatalf("unexpected err: %v", err) } diff --git a/tests/integration/init_test.go b/tests/integration/init_test.go index 7b1df64..45a1ee2 100644 --- a/tests/integration/init_test.go +++ b/tests/integration/init_test.go @@ -15,6 +15,7 @@ package integration import ( + "context" "fmt" "io/ioutil" "os" @@ -106,7 +107,7 @@ func testInitNew(t *testing.T, merge bool) { storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port) storePath := filepath.Join(common.StoreBasePath, clusterName) - sm := store.NewStoreManager(tstore.store, storePath) + sm := store.NewStore(tstore.store, storePath) initialClusterSpec := &cluster.ClusterSpec{ InitMode: cluster.ClusterInitModeP(cluster.ClusterInitModeNew), @@ -144,7 +145,7 @@ func testInitNew(t *testing.T, merge bool) { t.Fatalf("unexpected err: %v", err) } - cd, _, err := sm.GetClusterData() + cd, _, err := sm.GetClusterData(context.TODO()) // max_connection should be set by initdb _, ok := cd.Cluster.Spec.PGParameters["max_connections"] if merge && !ok { @@ -182,7 +183,7 @@ func testInitExisting(t *testing.T, merge bool) { storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port) storePath := filepath.Join(common.StoreBasePath, clusterName) - sm := store.NewStoreManager(tstore.store, storePath) + sm := store.NewStore(tstore.store, storePath) initialClusterSpec := &cluster.ClusterSpec{ InitMode: cluster.ClusterInitModeP(cluster.ClusterInitModeNew), @@ -283,7 +284,7 @@ func testInitExisting(t *testing.T, merge bool) { t.Fatalf("expected archive_mode empty") } - cd, _, err := sm.GetClusterData() + cd, _, err := sm.GetClusterData(context.TODO()) // max_connection should be set by initdb v, ok = cd.Cluster.Spec.PGParameters["archive_mode"] if merge && v != "on" { @@ -328,7 +329,7 @@ func TestInitUsers(t *testing.T) { clusterName = uuid.NewV4().String() storePath := filepath.Join(common.StoreBasePath, clusterName) - sm := store.NewStoreManager(tstore.store, storePath) + sm := store.NewStore(tstore.store, storePath) initialClusterSpec := &cluster.ClusterSpec{ InitMode: cluster.ClusterInitModeP(cluster.ClusterInitModeNew), @@ -371,7 +372,7 @@ func TestInitUsers(t *testing.T) { clusterName = uuid.NewV4().String() storePath = filepath.Join(common.StoreBasePath, clusterName) - sm = store.NewStoreManager(tstore.store, storePath) + sm = store.NewStore(tstore.store, storePath) ts2, err := NewTestSentinel(t, dir, clusterName, tstore.storeBackend, storeEndpoints, fmt.Sprintf("--initial-cluster-spec=%s", initialClusterSpecFile)) if err != nil { @@ -419,7 +420,7 @@ func TestInitialClusterSpec(t *testing.T) { storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port) storePath := filepath.Join(common.StoreBasePath, clusterName) - sm := store.NewStoreManager(tstore.store, storePath) + sm := store.NewStore(tstore.store, storePath) initialClusterSpec := &cluster.ClusterSpec{ InitMode: cluster.ClusterInitModeP(cluster.ClusterInitModeNew), @@ -447,7 +448,7 @@ func TestInitialClusterSpec(t *testing.T) { t.Fatal("expected cluster in initializing phase") } - cd, _, err := sm.GetClusterData() + cd, _, err := sm.GetClusterData(context.TODO()) if err != nil { t.Fatalf("unexpected err: %v", err) } diff --git a/tests/integration/pitr_test.go b/tests/integration/pitr_test.go index dd510ff..aed455f 100644 --- a/tests/integration/pitr_test.go +++ b/tests/integration/pitr_test.go @@ -15,6 +15,7 @@ package integration import ( + "context" "fmt" "io/ioutil" "os" @@ -56,7 +57,7 @@ func TestPITR(t *testing.T) { storePath := filepath.Join(common.StoreBasePath, clusterName) - sm := store.NewStoreManager(tstore.store, storePath) + sm := store.NewStore(tstore.store, storePath) initialClusterSpec := &cluster.ClusterSpec{ InitMode: cluster.ClusterInitModeP(cluster.ClusterInitModeNew), @@ -130,11 +131,11 @@ func TestPITR(t *testing.T) { ts.Stop() // Delete the current cluster data - if err := tstore.store.Delete(filepath.Join(storePath, "clusterdata")); err != nil { + if err := tstore.store.Delete(context.TODO(), filepath.Join(storePath, "clusterdata")); err != nil { t.Fatalf("unexpected err: %v", err) } // Delete sentinel leader key to just speedup new election - if err := tstore.store.Delete(filepath.Join(storePath, common.SentinelLeaderKey)); err != nil { + if err := tstore.store.Delete(context.TODO(), filepath.Join(storePath, common.SentinelLeaderKey)); err != nil && err != store.ErrKeyNotFound { t.Fatalf("unexpected err: %v", err) } diff --git a/tests/integration/proxy_test.go b/tests/integration/proxy_test.go index 08764a7..4549dab 100644 --- a/tests/integration/proxy_test.go +++ b/tests/integration/proxy_test.go @@ -15,6 +15,7 @@ package integration import ( + "context" "fmt" "io/ioutil" "os" @@ -78,7 +79,7 @@ func TestProxyListening(t *testing.T) { storePath := filepath.Join(common.StoreBasePath, clusterName) - sm := store.NewStoreManager(tstore.store, storePath) + sm := store.NewStore(tstore.store, storePath) cd := &cluster.ClusterData{ FormatVersion: cluster.CurrentCDFormatVersion, @@ -126,7 +127,7 @@ func TestProxyListening(t *testing.T) { }, }, } - pair, err := sm.AtomicPutClusterData(cd, nil) + pair, err := sm.AtomicPutClusterData(context.TODO(), cd, nil) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -196,7 +197,7 @@ func TestProxyListening(t *testing.T) { t.Logf("test proxyConf removed. Should continue listening") // remove proxyConf cd.Proxy.Spec.MasterDBUID = "" - pair, err = sm.AtomicPutClusterData(cd, pair) + pair, err = sm.AtomicPutClusterData(context.TODO(), cd, pair) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -209,7 +210,7 @@ func TestProxyListening(t *testing.T) { t.Logf("test proxyConf restored. Should continue listening") // Set proxyConf again cd.Proxy.Spec.MasterDBUID = "01" - pair, err = sm.AtomicPutClusterData(cd, pair) + pair, err = sm.AtomicPutClusterData(context.TODO(), cd, pair) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -221,7 +222,7 @@ func TestProxyListening(t *testing.T) { t.Logf("test clusterView removed. Should continue listening") // remove whole clusterview - _, err = sm.AtomicPutClusterData(nil, pair) + _, err = sm.AtomicPutClusterData(context.TODO(), nil, pair) if err != nil { t.Fatalf("unexpected err: %v", err) } diff --git a/tests/integration/standby_test.go b/tests/integration/standby_test.go index c0bf59f..61b9a37 100644 --- a/tests/integration/standby_test.go +++ b/tests/integration/standby_test.go @@ -46,7 +46,7 @@ func TestInitStandbyCluster(t *testing.T) { primaryStoreEndpoints := fmt.Sprintf("%s:%s", ptstore.listenAddress, ptstore.port) pStorePath := filepath.Join(common.StoreBasePath, primaryClusterName) - psm := store.NewStoreManager(ptstore.store, pStorePath) + psm := store.NewStore(ptstore.store, pStorePath) initialClusterSpec := &cluster.ClusterSpec{ InitMode: cluster.ClusterInitModeP(cluster.ClusterInitModeNew), @@ -95,7 +95,7 @@ func TestInitStandbyCluster(t *testing.T) { storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port) storePath := filepath.Join(common.StoreBasePath, clusterName) - sm := store.NewStoreManager(tstore.store, storePath) + sm := store.NewStore(tstore.store, storePath) pgpass, err := ioutil.TempFile(dir, "pgpass") if err != nil { @@ -175,7 +175,7 @@ func TestPromoteStandbyCluster(t *testing.T) { primaryStoreEndpoints := fmt.Sprintf("%s:%s", ptstore.listenAddress, ptstore.port) pStorePath := filepath.Join(common.StoreBasePath, primaryClusterName) - psm := store.NewStoreManager(ptstore.store, pStorePath) + psm := store.NewStore(ptstore.store, pStorePath) initialClusterSpec := &cluster.ClusterSpec{ InitMode: cluster.ClusterInitModeP(cluster.ClusterInitModeNew), @@ -224,7 +224,7 @@ func TestPromoteStandbyCluster(t *testing.T) { storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port) storePath := filepath.Join(common.StoreBasePath, clusterName) - sm := store.NewStoreManager(tstore.store, storePath) + sm := store.NewStore(tstore.store, storePath) pgpass, err := ioutil.TempFile(dir, "pgpass") if err != nil { diff --git a/tests/integration/utils.go b/tests/integration/utils.go index c10a5cc..f2e6b8f 100644 --- a/tests/integration/utils.go +++ b/tests/integration/utils.go @@ -16,6 +16,7 @@ package integration import ( "bufio" + "context" "database/sql" "encoding/json" "fmt" @@ -38,7 +39,6 @@ import ( "github.com/sorintlab/stolon/pkg/store" "github.com/sorintlab/stolon/pkg/util" - kvstore "github.com/docker/libkv/store" _ "github.com/lib/pq" "github.com/satori/go.uuid" "github.com/sgotti/gexpect" @@ -693,22 +693,25 @@ type TestStore struct { Process listenAddress string port string - store kvstore.Store + store store.KVStore storeBackend store.Backend } func NewTestStore(t *testing.T, dir string, a ...string) (*TestStore, error) { storeBackend := store.Backend(os.Getenv("STOLON_TEST_STORE_BACKEND")) switch storeBackend { - case store.CONSUL: + case "consul": return NewTestConsul(t, dir, a...) - case store.ETCD: - return NewTestEtcd(t, dir, a...) + case "etcd": + storeBackend = "etcdv2" + fallthrough + case "etcdv2", "etcdv3": + return NewTestEtcd(t, dir, storeBackend, a...) } return nil, fmt.Errorf("wrong store backend") } -func NewTestEtcd(t *testing.T, dir string, a ...string) (*TestStore, error) { +func NewTestEtcd(t *testing.T, dir string, backend store.Backend, a ...string) (*TestStore, error) { u := uuid.NewV4() uid := fmt.Sprintf("%x", u[:4]) @@ -736,10 +739,10 @@ func NewTestEtcd(t *testing.T, dir string, a ...string) (*TestStore, error) { storeEndpoints := fmt.Sprintf("%s:%s", listenAddress, port) storeConfig := store.Config{ - Backend: store.ETCD, + Backend: store.Backend(backend), Endpoints: storeEndpoints, } - kvstore, err := store.NewStore(storeConfig) + kvstore, err := store.NewKVStore(storeConfig) if err != nil { return nil, fmt.Errorf("cannot create store: %v", err) } @@ -760,7 +763,7 @@ func NewTestEtcd(t *testing.T, dir string, a ...string) (*TestStore, error) { listenAddress: listenAddress, port: port, store: kvstore, - storeBackend: store.ETCD, + storeBackend: backend, } return tstore, nil } @@ -825,7 +828,7 @@ func NewTestConsul(t *testing.T, dir string, a ...string) (*TestStore, error) { Backend: store.CONSUL, Endpoints: storeEndpoints, } - kvstore, err := store.NewStore(storeConfig) + kvstore, err := store.NewKVStore(storeConfig) if err != nil { return nil, fmt.Errorf("cannot create store: %v", err) } @@ -854,9 +857,9 @@ func NewTestConsul(t *testing.T, dir string, a ...string) (*TestStore, error) { func (ts *TestStore) WaitUp(timeout time.Duration) error { start := time.Now() for time.Now().Add(-timeout).Before(start) { - _, err := ts.store.Get("anykey") + _, err := ts.store.Get(context.TODO(), "anykey") ts.t.Logf("err: %v", err) - if err != nil && err == kvstore.ErrKeyNotFound { + if err != nil && err == store.ErrKeyNotFound { return nil } if err == nil { @@ -871,8 +874,8 @@ func (ts *TestStore) WaitUp(timeout time.Duration) error { func (ts *TestStore) WaitDown(timeout time.Duration) error { start := time.Now() for time.Now().Add(-timeout).Before(start) { - _, err := ts.store.Get("anykey") - if err != nil && err != kvstore.ErrKeyNotFound { + _, err := ts.store.Get(context.TODO(), "anykey") + if err != nil && err != store.ErrKeyNotFound { return nil } time.Sleep(sleepInterval) @@ -881,10 +884,10 @@ func (ts *TestStore) WaitDown(timeout time.Duration) error { return fmt.Errorf("timeout") } -func WaitClusterDataWithMaster(e *store.StoreManager, timeout time.Duration) (string, error) { +func WaitClusterDataWithMaster(e *store.Store, timeout time.Duration) (string, error) { start := time.Now() for time.Now().Add(-timeout).Before(start) { - cd, _, err := e.GetClusterData() + cd, _, err := e.GetClusterData(context.TODO()) if err != nil || cd == nil { goto end } @@ -897,10 +900,10 @@ func WaitClusterDataWithMaster(e *store.StoreManager, timeout time.Duration) (st return "", fmt.Errorf("timeout") } -func WaitClusterDataMaster(master string, e *store.StoreManager, timeout time.Duration) error { +func WaitClusterDataMaster(master string, e *store.Store, timeout time.Duration) error { start := time.Now() for time.Now().Add(-timeout).Before(start) { - cd, _, err := e.GetClusterData() + cd, _, err := e.GetClusterData(context.TODO()) if err != nil || cd == nil { goto end } @@ -915,10 +918,10 @@ func WaitClusterDataMaster(master string, e *store.StoreManager, timeout time.Du return fmt.Errorf("timeout") } -func WaitClusterDataKeeperInitialized(keeperUID string, e *store.StoreManager, timeout time.Duration) error { +func WaitClusterDataKeeperInitialized(keeperUID string, e *store.Store, timeout time.Duration) error { start := time.Now() for time.Now().Add(-timeout).Before(start) { - cd, _, err := e.GetClusterData() + cd, _, err := e.GetClusterData(context.TODO()) if err != nil || cd == nil { goto end } @@ -939,11 +942,11 @@ func WaitClusterDataKeeperInitialized(keeperUID string, e *store.StoreManager, t // WaitClusterDataSynchronousStandbys waits for: // * synchronous standby defined in masterdb spec // * synchronous standby reported from masterdb status -func WaitClusterDataSynchronousStandbys(synchronousStandbys []string, e *store.StoreManager, timeout time.Duration) error { +func WaitClusterDataSynchronousStandbys(synchronousStandbys []string, e *store.Store, timeout time.Duration) error { sort.Sort(sort.StringSlice(synchronousStandbys)) start := time.Now() for time.Now().Add(-timeout).Before(start) { - cd, _, err := e.GetClusterData() + cd, _, err := e.GetClusterData(context.TODO()) if err != nil || cd == nil { goto end } @@ -982,10 +985,10 @@ func WaitClusterDataSynchronousStandbys(synchronousStandbys []string, e *store.S return fmt.Errorf("timeout") } -func WaitClusterPhase(e *store.StoreManager, phase cluster.ClusterPhase, timeout time.Duration) error { +func WaitClusterPhase(e *store.Store, phase cluster.ClusterPhase, timeout time.Duration) error { start := time.Now() for time.Now().Add(-timeout).Before(start) { - cd, _, err := e.GetClusterData() + cd, _, err := e.GetClusterData(context.TODO()) if err != nil || cd == nil { goto end } @@ -998,10 +1001,10 @@ func WaitClusterPhase(e *store.StoreManager, phase cluster.ClusterPhase, timeout return fmt.Errorf("timeout") } -func WaitNumDBs(e *store.StoreManager, n int, timeout time.Duration) error { +func WaitNumDBs(e *store.Store, n int, timeout time.Duration) error { start := time.Now() for time.Now().Add(-timeout).Before(start) { - cd, _, err := e.GetClusterData() + cd, _, err := e.GetClusterData(context.TODO()) if err != nil || cd == nil { goto end } @@ -1014,10 +1017,10 @@ func WaitNumDBs(e *store.StoreManager, n int, timeout time.Duration) error { return fmt.Errorf("timeout") } -func WaitStandbyKeeper(e *store.StoreManager, keeperUID string, timeout time.Duration) error { +func WaitStandbyKeeper(e *store.Store, keeperUID string, timeout time.Duration) error { start := time.Now() for time.Now().Add(-timeout).Before(start) { - cd, _, err := e.GetClusterData() + cd, _, err := e.GetClusterData(context.TODO()) if err != nil || cd == nil { goto end } @@ -1036,10 +1039,10 @@ func WaitStandbyKeeper(e *store.StoreManager, keeperUID string, timeout time.Dur return fmt.Errorf("timeout") } -func WaitClusterDataKeepers(keepersUIDs []string, e *store.StoreManager, timeout time.Duration) error { +func WaitClusterDataKeepers(keepersUIDs []string, e *store.Store, timeout time.Duration) error { start := time.Now() for time.Now().Add(-timeout).Before(start) { - cd, _, err := e.GetClusterData() + cd, _, err := e.GetClusterData(context.TODO()) if err != nil || cd == nil { goto end } @@ -1061,12 +1064,12 @@ func WaitClusterDataKeepers(keepersUIDs []string, e *store.StoreManager, timeout // WaitClusterSyncedXLogPos waits for all the specified keepers to have the same // reported XLogPos -func WaitClusterSyncedXLogPos(keepersUIDs []string, e *store.StoreManager, timeout time.Duration) error { +func WaitClusterSyncedXLogPos(keepersUIDs []string, e *store.Store, timeout time.Duration) error { start := time.Now() for time.Now().Add(-timeout).Before(start) { c := 0 curXLogPos := uint64(0) - cd, _, err := e.GetClusterData() + cd, _, err := e.GetClusterData(context.TODO()) if err != nil || cd == nil { goto end }