*: add support for etcd v3 api

for etcdv2 and consul we keep using libkv while for etcd v3 api we'll directly
use the etcdclientv3. We abstract the implementation under a KVStore and an
Election interface. We haven't implemented a libkv store for etcdv3 since libkv
looks quite dead and doesn't support the context package. So in future we'll
probably move out from libkv to better use context features.
This commit is contained in:
Simone Gotti 2017-12-22 14:46:45 +01:00
parent 945b863eff
commit e84ff2cda3
25 changed files with 676 additions and 213 deletions

View File

@ -34,7 +34,7 @@ type CommonConfig struct {
}
func AddCommonFlags(cmd *cobra.Command, cfg *CommonConfig) {
cmd.PersistentFlags().StringVar(&cfg.StoreBackend, "store-backend", "", "store backend type (etcd or consul)")
cmd.PersistentFlags().StringVar(&cfg.StoreBackend, "store-backend", "", "store backend type (etcdv2/etcd, etcdv3 or consul)")
cmd.PersistentFlags().StringVar(&cfg.StoreEndpoints, "store-endpoints", "", "a comma-delimited list of store endpoints (use https scheme for tls communication) (defaults: http://127.0.0.1:2379 for etcd, http://127.0.0.1:8500 for consul)")
cmd.PersistentFlags().StringVar(&cfg.StoreCertFile, "store-cert-file", "", "certificate file for client identification to the store")
cmd.PersistentFlags().StringVar(&cfg.StoreKeyFile, "store-key", "", "private key file for client identification to the store")
@ -54,6 +54,10 @@ func CheckCommonConfig(cfg *CommonConfig) error {
switch cfg.StoreBackend {
case "consul":
case "etcd":
// etcd is old alias for etcdv2
cfg.StoreBackend = "etcdv2"
case "etcdv2":
case "etcdv3":
default:
return fmt.Errorf("Unknown store backend: %q", cfg.StoreBackend)
}

View File

@ -364,7 +364,7 @@ type PostgresKeeper struct {
sleepInterval time.Duration
requestTimeout time.Duration
e *store.StoreManager
e *store.Store
pgm *postgresql.Manager
stop chan bool
end chan error
@ -381,7 +381,7 @@ type PostgresKeeper struct {
func NewPostgresKeeper(cfg *config, stop chan bool, end chan error) (*PostgresKeeper, error) {
storePath := filepath.Join(common.StoreBasePath, cfg.ClusterName)
kvstore, err := store.NewStore(store.Config{
kvstore, err := store.NewKVStore(store.Config{
Backend: store.Backend(cfg.StoreBackend),
Endpoints: cfg.StoreEndpoints,
CertFile: cfg.StoreCertFile,
@ -392,7 +392,7 @@ func NewPostgresKeeper(cfg *config, stop chan bool, end chan error) (*PostgresKe
if err != nil {
return nil, fmt.Errorf("cannot create store: %v", err)
}
e := store.NewStoreManager(kvstore, storePath)
e := store.NewStore(kvstore, storePath)
// Clean and get absolute datadir path
dataDir, err := filepath.Abs(cfg.dataDir)
@ -480,7 +480,7 @@ func (p *PostgresKeeper) updateKeeperInfo() error {
// The time to live is just to automatically remove old entries, it's
// not used to determine if the keeper info has been updated.
if err := p.e.SetKeeperInfo(keeperUID, keeperInfo, p.sleepInterval); err != nil {
if err := p.e.SetKeeperInfo(context.TODO(), keeperUID, keeperInfo, p.sleepInterval); err != nil {
return err
}
return nil
@ -651,7 +651,7 @@ func (p *PostgresKeeper) Start() {
var err error
var cd *cluster.ClusterData
cd, _, err = p.e.GetClusterData()
cd, _, err = p.e.GetClusterData(context.TODO())
if err != nil {
log.Errorw("error retrieving cluster data", zap.Error(err))
} else if cd != nil {
@ -841,7 +841,7 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
e := p.e
pgm := p.pgm
cd, _, err := e.GetClusterData()
cd, _, err := e.GetClusterData(pctx)
if err != nil {
log.Errorw("error retrieving cluster data", zap.Error(err))
return

View File

@ -15,6 +15,7 @@
package main
import (
"context"
"fmt"
"net"
"os"
@ -100,7 +101,7 @@ type ClusterChecker struct {
listener *net.TCPListener
pp *pollon.Proxy
e *store.StoreManager
e *store.Store
endPollonProxyCh chan error
pollonMutex sync.Mutex
@ -109,7 +110,7 @@ type ClusterChecker struct {
func NewClusterChecker(uid string, cfg config) (*ClusterChecker, error) {
storePath := filepath.Join(common.StoreBasePath, cfg.ClusterName)
kvstore, err := store.NewStore(store.Config{
kvstore, err := store.NewKVStore(store.Config{
Backend: store.Backend(cfg.StoreBackend),
Endpoints: cfg.StoreEndpoints,
CertFile: cfg.StoreCertFile,
@ -120,7 +121,7 @@ func NewClusterChecker(uid string, cfg config) (*ClusterChecker, error) {
if err != nil {
return nil, fmt.Errorf("cannot create store: %v", err)
}
e := store.NewStoreManager(kvstore, storePath)
e := store.NewStore(kvstore, storePath)
return &ClusterChecker{
uid: uid,
@ -189,14 +190,14 @@ func (c *ClusterChecker) sendPollonConfData(confData pollon.ConfData) {
}
}
func (c *ClusterChecker) SetProxyInfo(e *store.StoreManager, generation int64, ttl time.Duration) error {
func (c *ClusterChecker) SetProxyInfo(e *store.Store, generation int64, ttl time.Duration) error {
proxyInfo := &cluster.ProxyInfo{
UID: c.uid,
Generation: generation,
}
log.Debugf("proxyInfo dump: %s", spew.Sdump(proxyInfo))
if err := c.e.SetProxyInfo(proxyInfo, ttl); err != nil {
if err := c.e.SetProxyInfo(context.TODO(), proxyInfo, ttl); err != nil {
return err
}
return nil
@ -204,7 +205,7 @@ func (c *ClusterChecker) SetProxyInfo(e *store.StoreManager, generation int64, t
// Check reads the cluster data and applies the right pollon configuration.
func (c *ClusterChecker) Check() error {
cd, _, err := c.e.GetClusterData()
cd, _, err := c.e.GetClusterData(context.TODO())
if err != nil {
return fmt.Errorf("cannot get cluster data: %v", err)
}
@ -311,6 +312,10 @@ func (c *ClusterChecker) Start() error {
checkCh := make(chan error)
timerCh := time.NewTimer(0).C
// TODO(sgotti) TimeoutCecker is needed to forcefully close connection also
// if the Check method is blocked somewhere.
// The idomatic/cleaner solution will be to use a context instead of this
// TimeoutChecker but we have to change the libkv stores to support contexts.
go c.TimeoutChecker(checkOkCh)
for true {

View File

@ -40,7 +40,6 @@ import (
"github.com/sorintlab/stolon/pkg/util"
"github.com/davecgh/go-spew/spew"
"github.com/docker/leadership"
"github.com/mitchellh/copystructure"
"github.com/spf13/cobra"
"go.uber.org/zap"
@ -96,7 +95,7 @@ func die(format string, a ...interface{}) {
func (s *Sentinel) electionLoop() {
for {
log.Infow("Trying to acquire sentinels leadership")
electedCh, errCh := s.candidate.RunForElection()
electedCh, errCh := s.election.RunForElection()
for {
select {
case elected := <-electedCh:
@ -120,6 +119,7 @@ func (s *Sentinel) electionLoop() {
goto end
case <-s.stop:
log.Debugw("stopping election loop")
s.election.Stop()
return
}
}
@ -136,22 +136,18 @@ func (s *Sentinel) syncRepl(spec *cluster.ClusterSpec) bool {
return *spec.SynchronousReplication && *spec.Role == cluster.ClusterRoleMaster
}
func (s *Sentinel) setSentinelInfo(ttl time.Duration) error {
func (s *Sentinel) setSentinelInfo(ctx context.Context, ttl time.Duration) error {
sentinelInfo := &cluster.SentinelInfo{
UID: s.uid,
}
log.Debugw("sentinelInfo dump", "sentinelInfo", sentinelInfo)
if err := s.e.SetSentinelInfo(sentinelInfo, ttl); err != nil {
if err := s.e.SetSentinelInfo(ctx, sentinelInfo, ttl); err != nil {
return err
}
return nil
}
func (s *Sentinel) getKeepersInfo(ctx context.Context) (cluster.KeepersInfo, error) {
return s.e.GetKeepersInfo()
}
func (s *Sentinel) SetKeeperError(uid string) {
if _, ok := s.keeperErrorTimers[uid]; !ok {
s.keeperErrorTimers[uid] = timer.Now()
@ -1443,11 +1439,11 @@ type DBConvergenceInfo struct {
type Sentinel struct {
uid string
cfg *config
e *store.StoreManager
e *store.Store
candidate *leadership.Candidate
stop chan bool
end chan bool
election store.Election
stop chan bool
end chan bool
lastLeadershipCount uint
@ -1493,7 +1489,7 @@ func NewSentinel(uid string, cfg *config, stop chan bool, end chan bool) (*Senti
storePath := filepath.Join(common.StoreBasePath, cfg.ClusterName)
kvstore, err := store.NewStore(store.Config{
kvstore, err := store.NewKVStore(store.Config{
Backend: store.Backend(cfg.StoreBackend),
Endpoints: cfg.StoreEndpoints,
CertFile: cfg.StoreCertFile,
@ -1504,15 +1500,15 @@ func NewSentinel(uid string, cfg *config, stop chan bool, end chan bool) (*Senti
if err != nil {
return nil, fmt.Errorf("cannot create store: %v", err)
}
e := store.NewStoreManager(kvstore, storePath)
e := store.NewStore(kvstore, storePath)
candidate := leadership.NewCandidate(kvstore, filepath.Join(storePath, common.SentinelLeaderKey), uid, store.MinTTL)
election := store.NewElection(kvstore, filepath.Join(storePath, common.SentinelLeaderKey), uid)
return &Sentinel{
uid: uid,
cfg: cfg,
e: e,
candidate: candidate,
election: election,
leader: false,
initialClusterSpec: initialClusterSpec,
stop: stop,
@ -1539,9 +1535,8 @@ func (s *Sentinel) Start() {
for true {
select {
case <-s.stop:
log.Debugw("stopping stolon sentinel")
log.Infow("stopping stolon sentinel")
cancel()
s.candidate.Stop()
s.end <- true
return
case <-timerCh:
@ -1566,7 +1561,7 @@ func (s *Sentinel) clusterSentinelCheck(pctx context.Context) {
defer s.updateMutex.Unlock()
e := s.e
cd, prevCDPair, err := e.GetClusterData()
cd, prevCDPair, err := e.GetClusterData(pctx)
if err != nil {
log.Errorw("error retrieving cluster data", zap.Error(err))
return
@ -1599,27 +1594,25 @@ func (s *Sentinel) clusterSentinelCheck(pctx context.Context) {
log.Infow("writing initial cluster data")
newcd := cluster.NewClusterData(c)
log.Debugf("newcd dump: %s", spew.Sdump(newcd))
if _, err = e.AtomicPutClusterData(newcd, nil); err != nil {
if _, err = e.AtomicPutClusterData(pctx, newcd, nil); err != nil {
log.Errorw("error saving cluster data", zap.Error(err))
}
return
}
if err = s.setSentinelInfo(2 * s.sleepInterval); err != nil {
if err = s.setSentinelInfo(pctx, 2*s.sleepInterval); err != nil {
log.Errorw("cannot update sentinel info", zap.Error(err))
return
}
ctx, cancel := context.WithTimeout(pctx, s.requestTimeout)
keepersInfo, err := s.getKeepersInfo(ctx)
cancel()
keepersInfo, err := s.e.GetKeepersInfo(pctx)
if err != nil {
log.Errorw("cannot get keepers info", zap.Error(err))
return
}
log.Debugf("keepersInfo dump: %s", spew.Sdump(keepersInfo))
proxiesInfo, err := s.e.GetProxiesInfo()
proxiesInfo, err := s.e.GetProxiesInfo(pctx)
if err != nil {
log.Errorw("failed to get proxies info", zap.Error(err))
return
@ -1661,7 +1654,7 @@ func (s *Sentinel) clusterSentinelCheck(pctx context.Context) {
log.Debugf("newcd dump after updateCluster: %s", spew.Sdump(newcd))
if newcd != nil {
if _, err := e.AtomicPutClusterData(newcd, prevCDPair); err != nil {
if _, err := e.AtomicPutClusterData(pctx, newcd, prevCDPair); err != nil {
log.Errorw("error saving clusterdata", zap.Error(err))
}
}

View File

@ -39,11 +39,13 @@ func init() {
}
func clusterdata(cmd *cobra.Command, args []string) {
e, err := NewStore()
kvStore, err := NewKVStore()
if err != nil {
die("cannot create store: %v", err)
}
e := NewStore(kvStore)
cd, _, err := getClusterData(e)
if err != nil {
die("%v", err)

View File

@ -15,6 +15,7 @@
package main
import (
"context"
"encoding/json"
"io/ioutil"
"os"
@ -70,12 +71,14 @@ func initCluster(cmd *cobra.Command, args []string) {
}
}
e, err := NewStore()
kvStore, err := NewKVStore()
if err != nil {
die("cannot create store: %v", err)
}
cd, _, err := e.GetClusterData()
e := NewStore(kvStore)
cd, _, err := e.GetClusterData(context.TODO())
if err != nil {
die("cannot get cluster data: %v", err)
}
@ -96,7 +99,7 @@ func initCluster(cmd *cobra.Command, args []string) {
os.Exit(0)
}
cd, _, err = e.GetClusterData()
cd, _, err = e.GetClusterData(context.TODO())
if err != nil {
die("cannot get cluster data: %v", err)
}
@ -120,7 +123,7 @@ func initCluster(cmd *cobra.Command, args []string) {
cd = cluster.NewClusterData(c)
// We ignore if cd has been modified between reading and writing
if err := e.PutClusterData(cd); err != nil {
if err := e.PutClusterData(context.TODO(), cd); err != nil {
die("cannot update cluster data: %v", err)
}
}

View File

@ -15,6 +15,7 @@
package main
import (
"context"
"os"
"github.com/sorintlab/stolon/pkg/cluster"
@ -40,11 +41,13 @@ func promote(cmd *cobra.Command, args []string) {
die("too many arguments")
}
e, err := NewStore()
kvStore, err := NewKVStore()
if err != nil {
die("cannot create store: %v", err)
}
e := NewStore(kvStore)
accepted := true
if !initOpts.forceYes {
accepted, err = askConfirmation("Are you sure you want to continue? [yes/no] ")
@ -82,7 +85,7 @@ func promote(cmd *cobra.Command, args []string) {
}
// retry if cd has been modified between reading and writing
_, err = e.AtomicPutClusterData(cd, pair)
_, err = e.AtomicPutClusterData(context.TODO(), cd, pair)
if err != nil {
if err == libkvstore.ErrKeyModified {
retry++

View File

@ -15,6 +15,8 @@
package main
import (
"context"
"github.com/sorintlab/stolon/pkg/cluster"
"github.com/spf13/cobra"
)
@ -40,11 +42,13 @@ func removeKeeper(cmd *cobra.Command, args []string) {
keeperID := args[0]
store, err := NewStore()
kvStore, err := NewKVStore()
if err != nil {
die("cannot create store: %v", err)
}
store := NewStore(kvStore)
cd, pair, err := getClusterData(store)
if err != nil {
die("cannot get cluster data: %v", err)
@ -78,7 +82,7 @@ func removeKeeper(cmd *cobra.Command, args []string) {
// NOTE: if the removed db is listed inside another db.Followers it'll will
// be cleaned by the sentinels
_, err = store.AtomicPutClusterData(newCd, pair)
_, err = store.AtomicPutClusterData(context.TODO(), newCd, pair)
if err != nil {
die("cannot update cluster data: %v", err)
}

View File

@ -39,11 +39,13 @@ func init() {
}
func spec(cmd *cobra.Command, args []string) {
e, err := NewStore()
kvStore, err := NewKVStore()
if err != nil {
die("cannot create store: %v", err)
}
e := NewStore(kvStore)
cd, _, err := getClusterData(e)
if err != nil {
die("%v", err)

View File

@ -15,12 +15,14 @@
package main
import (
"context"
"fmt"
"os"
"sort"
"text/tabwriter"
"github.com/sorintlab/stolon/pkg/cluster"
"github.com/sorintlab/stolon/pkg/store"
"github.com/spf13/cobra"
)
@ -84,19 +86,22 @@ func status(cmd *cobra.Command, args []string) {
tabOut := new(tabwriter.Writer)
tabOut.Init(os.Stdout, 0, 8, 1, '\t', 0)
e, err := NewStore()
kvStore, err := NewKVStore()
if err != nil {
die("cannot create store: %v", err)
}
sentinelsInfo, err := e.GetSentinelsInfo()
e := NewStore(kvStore)
sentinelsInfo, err := e.GetSentinelsInfo(context.TODO())
if err != nil {
die("cannot get sentinels info: %v", err)
}
lsid, err := e.GetLeaderSentinelId()
if err != nil {
die("cannot get leader sentinel info")
election := NewElection(kvStore)
lsid, err := election.Leader()
if err != nil && err != store.ErrElectionNoLeader {
die("cannot get leader sentinel info: %v", err)
}
stdout("=== Active sentinels ===")
@ -118,7 +123,7 @@ func status(cmd *cobra.Command, args []string) {
}
}
proxiesInfo, err := e.GetProxiesInfo()
proxiesInfo, err := e.GetProxiesInfo(context.TODO())
if err != nil {
die("cannot get proxies info: %v", err)
}

View File

@ -16,6 +16,7 @@ package main
import (
"bufio"
"context"
"fmt"
"os"
"path/filepath"
@ -27,7 +28,6 @@ import (
"github.com/sorintlab/stolon/pkg/flagutil"
"github.com/sorintlab/stolon/pkg/store"
kvstore "github.com/docker/libkv/store"
"github.com/spf13/cobra"
)
@ -95,10 +95,8 @@ func die(format string, a ...interface{}) {
os.Exit(1)
}
func NewStore() (*store.StoreManager, error) {
storePath := filepath.Join(common.StoreBasePath, cfg.ClusterName)
kvstore, err := store.NewStore(store.Config{
func NewKVStore() (store.KVStore, error) {
return store.NewKVStore(store.Config{
Backend: store.Backend(cfg.StoreBackend),
Endpoints: cfg.StoreEndpoints,
CertFile: cfg.StoreCertFile,
@ -106,14 +104,20 @@ func NewStore() (*store.StoreManager, error) {
CAFile: cfg.StoreCAFile,
SkipTLSVerify: cfg.StoreSkipTlsVerify,
})
if err != nil {
return nil, fmt.Errorf("cannot create store: %v", err)
}
return store.NewStoreManager(kvstore, storePath), nil
}
func getClusterData(e *store.StoreManager) (*cluster.ClusterData, *kvstore.KVPair, error) {
cd, pair, err := e.GetClusterData()
func NewStore(kvStore store.KVStore) *store.Store {
storePath := filepath.Join(common.StoreBasePath, cfg.ClusterName)
return store.NewStore(kvStore, storePath)
}
func NewElection(kvStore store.KVStore) store.Election {
storePath := filepath.Join(common.StoreBasePath, cfg.ClusterName)
return store.NewElection(kvStore, filepath.Join(storePath, common.SentinelLeaderKey), "")
}
func getClusterData(e *store.Store) (*cluster.ClusterData, *store.KVPair, error) {
cd, pair, err := e.GetClusterData(context.TODO())
if err != nil {
return nil, nil, fmt.Errorf("cannot get cluster data: %v", err)
}

View File

@ -15,6 +15,7 @@
package main
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
@ -93,11 +94,13 @@ func update(cmd *cobra.Command, args []string) {
}
}
e, err := NewStore()
kvStore, err := NewKVStore()
if err != nil {
die("cannot create store: %v", err)
}
e := NewStore(kvStore)
retry := 0
for retry < maxRetries {
cd, pair, err := getClusterData(e)
@ -127,7 +130,7 @@ func update(cmd *cobra.Command, args []string) {
}
// retry if cd has been modified between reading and writing
_, err = e.AtomicPutClusterData(cd, pair)
_, err = e.AtomicPutClusterData(context.TODO(), cd, pair)
if err != nil {
if err == libkvstore.ErrKeyModified {
retry++

View File

@ -43,6 +43,7 @@ const (
)
const (
DefaultStoreTimeout = 5 * time.Second
DefaultProxyCheckInterval = 5 * time.Second
DefaultProxyTimeoutInterval = 15 * time.Second

226
pkg/store/etcdv3.go Normal file
View File

@ -0,0 +1,226 @@
// Copyright 2017 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.
package store
import (
"context"
"time"
etcdclientv3 "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
)
func fromEtcV3Error(err error) error {
switch err {
case rpctypes.ErrKeyNotFound:
return ErrKeyNotFound
case concurrency.ErrElectionNoLeader:
return ErrElectionNoLeader
}
return err
}
type etcdV3Store struct {
c *etcdclientv3.Client
requestTimeout time.Duration
}
func (s *etcdV3Store) Put(pctx context.Context, key string, value []byte, options *WriteOptions) error {
etcdv3Options := []etcdclientv3.OpOption{}
if options != nil {
if options.TTL > 0 {
ctx, cancel := context.WithTimeout(pctx, s.requestTimeout)
lease, err := s.c.Grant(ctx, int64(options.TTL.Seconds()))
cancel()
if err != nil {
return err
}
etcdv3Options = append(etcdv3Options, etcdclientv3.WithLease(lease.ID))
}
}
ctx, cancel := context.WithTimeout(pctx, s.requestTimeout)
_, err := s.c.Put(ctx, key, string(value), etcdv3Options...)
cancel()
return fromLibKVStoreErr(err)
}
func (s *etcdV3Store) Get(pctx context.Context, key string) (*KVPair, error) {
ctx, cancel := context.WithTimeout(pctx, s.requestTimeout)
resp, err := s.c.Get(ctx, key)
cancel()
if err != nil {
return nil, fromEtcV3Error(err)
}
if len(resp.Kvs) == 0 {
return nil, ErrKeyNotFound
}
kv := resp.Kvs[0]
return &KVPair{Key: string(kv.Key), Value: kv.Value, LastIndex: uint64(kv.ModRevision)}, nil
}
func (s *etcdV3Store) List(pctx context.Context, directory string) ([]*KVPair, error) {
ctx, cancel := context.WithTimeout(pctx, s.requestTimeout)
resp, err := s.c.Get(ctx, directory, etcdclientv3.WithPrefix())
cancel()
if err != nil {
return nil, fromEtcV3Error(err)
}
kvPairs := make([]*KVPair, len(resp.Kvs))
for i, kv := range resp.Kvs {
kvPairs[i] = &KVPair{Key: string(kv.Key), Value: kv.Value, LastIndex: uint64(kv.ModRevision)}
}
return kvPairs, nil
}
func (s *etcdV3Store) AtomicPut(pctx context.Context, key string, value []byte, previous *KVPair, options *WriteOptions) (*KVPair, error) {
etcdv3Options := []etcdclientv3.OpOption{}
if options != nil {
if options.TTL > 0 {
ctx, cancel := context.WithTimeout(pctx, s.requestTimeout)
lease, err := s.c.Grant(ctx, int64(options.TTL))
cancel()
if err != nil {
return nil, err
}
etcdv3Options = append(etcdv3Options, etcdclientv3.WithLease(lease.ID))
}
}
var cmp etcdclientv3.Cmp
if previous != nil {
cmp = etcdclientv3.Compare(etcdclientv3.ModRevision(key), "=", int64(previous.LastIndex))
} else {
// key doens't exists
cmp = etcdclientv3.Compare(etcdclientv3.CreateRevision(key), "=", 0)
}
ctx, cancel := context.WithTimeout(pctx, s.requestTimeout)
txn := s.c.Txn(ctx).If(cmp)
txn = txn.Then(etcdclientv3.OpPut(key, string(value), etcdv3Options...))
tresp, err := txn.Commit()
cancel()
if err != nil {
return nil, fromEtcV3Error(err)
}
if !tresp.Succeeded {
return nil, ErrKeyModified
}
revision := tresp.Responses[0].GetResponsePut().Header.Revision
return &KVPair{Key: key, Value: value, LastIndex: uint64(revision)}, nil
}
func (s *etcdV3Store) Delete(pctx context.Context, key string) error {
ctx, cancel := context.WithTimeout(pctx, s.requestTimeout)
_, err := s.c.Delete(ctx, key)
cancel()
return fromEtcV3Error(err)
}
func (s *etcdV3Store) Close() error {
return s.c.Close()
}
type etcdv3Election struct {
c *etcdclientv3.Client
path string
candidateUID string
ttl time.Duration
requestTimeout time.Duration
running bool
electedCh chan bool
errCh chan error
ctx context.Context
cancel context.CancelFunc
}
func (e *etcdv3Election) RunForElection() (<-chan bool, <-chan error) {
if e.running {
panic("already running")
}
e.electedCh = make(chan bool)
e.errCh = make(chan error)
e.ctx, e.cancel = context.WithCancel(context.Background())
e.running = true
go e.campaign()
return e.electedCh, e.errCh
}
func (e *etcdv3Election) Stop() {
if !e.running {
panic("not running")
}
e.cancel()
e.running = false
}
func (e *etcdv3Election) Leader() (string, error) {
s, err := concurrency.NewSession(e.c, concurrency.WithTTL(int(e.ttl.Seconds())))
if err != nil {
return "", fromEtcV3Error(err)
}
defer s.Close()
etcdElection := concurrency.NewElection(s, e.path)
ctx, cancel := context.WithTimeout(context.Background(), e.requestTimeout)
resp, err := etcdElection.Leader(ctx)
cancel()
if err != nil {
return "", fromEtcV3Error(err)
}
leader := string(resp.Kvs[0].Value)
return leader, nil
}
func (e *etcdv3Election) campaign() {
defer close(e.electedCh)
defer close(e.errCh)
for {
e.electedCh <- false
s, err := concurrency.NewSession(e.c, concurrency.WithTTL(int(e.ttl.Seconds())), concurrency.WithContext(e.ctx))
if err != nil {
e.running = false
e.errCh <- err
return
}
etcdElection := concurrency.NewElection(s, e.path)
if err = etcdElection.Campaign(e.ctx, e.candidateUID); err != nil {
e.running = false
e.errCh <- err
return
}
e.electedCh <- true
select {
case <-e.ctx.Done():
e.running = false
etcdElection.Resign(context.TODO())
return
case <-s.Done():
etcdElection.Resign(context.TODO())
e.electedCh <- false
}
}
}

120
pkg/store/libkv.go Normal file
View File

@ -0,0 +1,120 @@
// Copyright 2017 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.
package store
import (
"context"
"github.com/docker/leadership"
libkvstore "github.com/docker/libkv/store"
"github.com/docker/libkv/store/consul"
"github.com/docker/libkv/store/etcd"
)
func init() {
etcd.Register()
consul.Register()
}
func fromLibKVStoreErr(err error) error {
switch err {
case libkvstore.ErrKeyNotFound:
return ErrKeyNotFound
}
return err
}
type libKVStore struct {
store libkvstore.Store
}
func (s *libKVStore) Put(ctx context.Context, key string, value []byte, options *WriteOptions) error {
var libkvOptions *libkvstore.WriteOptions
if options != nil {
libkvOptions = &libkvstore.WriteOptions{TTL: options.TTL}
}
err := s.store.Put(key, value, libkvOptions)
return fromLibKVStoreErr(err)
}
func (s *libKVStore) Get(ctx context.Context, key string) (*KVPair, error) {
pair, err := s.store.Get(key)
if err != nil {
return nil, fromLibKVStoreErr(err)
}
return &KVPair{Key: pair.Key, Value: pair.Value, LastIndex: pair.LastIndex}, nil
}
func (s *libKVStore) List(ctx context.Context, directory string) ([]*KVPair, error) {
pairs, err := s.store.List(directory)
if err != nil {
return nil, fromLibKVStoreErr(err)
}
kvPairs := make([]*KVPair, len(pairs))
for i, p := range pairs {
kvPairs[i] = &KVPair{Key: p.Key, Value: p.Value, LastIndex: p.LastIndex}
}
return kvPairs, nil
}
func (s *libKVStore) AtomicPut(ctx context.Context, key string, value []byte, previous *KVPair, options *WriteOptions) (*KVPair, error) {
var libkvPrevious *libkvstore.KVPair
if previous != nil {
libkvPrevious = &libkvstore.KVPair{Key: previous.Key, LastIndex: previous.LastIndex}
}
var libkvOptions *libkvstore.WriteOptions
if options != nil {
libkvOptions = &libkvstore.WriteOptions{TTL: options.TTL}
}
_, pair, err := s.store.AtomicPut(key, value, libkvPrevious, libkvOptions)
if err != nil {
return nil, fromLibKVStoreErr(err)
}
return &KVPair{Key: pair.Key, Value: pair.Value, LastIndex: pair.LastIndex}, nil
}
func (s *libKVStore) Delete(ctx context.Context, key string) error {
return fromLibKVStoreErr(s.store.Delete(key))
}
func (s *libKVStore) Close() error {
s.store.Close()
return nil
}
type libkvElection struct {
store *libKVStore
path string
candidate *leadership.Candidate
}
func (e *libkvElection) RunForElection() (<-chan bool, <-chan error) {
return e.candidate.RunForElection()
}
func (e *libkvElection) Stop() {
e.candidate.Stop()
}
func (e *libkvElection) Leader() (string, error) {
pair, err := e.store.Get(context.TODO(), e.path)
if err != nil {
if err != ErrKeyNotFound {
return "", err
}
return "", nil
}
return string(pair.Value), nil
}

View File

@ -15,8 +15,10 @@
package store
import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"net/url"
"path/filepath"
@ -27,23 +29,19 @@ import (
"github.com/sorintlab/stolon/common"
"github.com/sorintlab/stolon/pkg/cluster"
etcdclientv3 "github.com/coreos/etcd/clientv3"
"github.com/docker/leadership"
"github.com/docker/libkv"
kvstore "github.com/docker/libkv/store"
"github.com/docker/libkv/store/consul"
"github.com/docker/libkv/store/etcd"
libkvstore "github.com/docker/libkv/store"
)
func init() {
etcd.Register()
consul.Register()
}
// Backend represents a KV Store Backend
type Backend string
const (
CONSUL Backend = "consul"
ETCD Backend = "etcd"
ETCDV2 Backend = "etcdv2"
ETCDV3 Backend = "etcdv3"
)
const (
@ -61,6 +59,13 @@ const (
DefaultConsulEndpoints = "http://127.0.0.1:8500"
)
var (
// ErrKeyNotFound is thrown when the key is not found in the store during a Get operation
ErrKeyNotFound = errors.New("Key not found in store")
ErrKeyModified = errors.New("Unable to complete atomic operation, key modified")
ErrElectionNoLeader = errors.New("election: no leader")
)
const (
//TODO(sgotti) fix this in libkv?
// consul min ttl is 10s and libkv divides this by 2
@ -78,18 +83,45 @@ type Config struct {
SkipTLSVerify bool
}
type StoreManager struct {
clusterPath string
store kvstore.Store
// KVPair represents {Key, Value, Lastindex} tuple
type KVPair struct {
Key string
Value []byte
LastIndex uint64
}
func NewStore(cfg Config) (kvstore.Store, error) {
var kvBackend kvstore.Backend
type WriteOptions struct {
TTL time.Duration
}
type KVStore interface {
// Put a value at the specified key
Put(ctx context.Context, key string, value []byte, options *WriteOptions) error
// Get a value given its key
Get(ctx context.Context, key string) (*KVPair, error)
// List the content of a given prefix
List(ctx context.Context, directory string) ([]*KVPair, error)
// Atomic CAS operation on a single value.
// Pass previous = nil to create a new key.
AtomicPut(ctx context.Context, key string, value []byte, previous *KVPair, options *WriteOptions) (*KVPair, error)
Delete(ctx context.Context, key string) error
// Close the store connection
Close() error
}
func NewKVStore(cfg Config) (KVStore, error) {
var kvBackend libkvstore.Backend
switch cfg.Backend {
case CONSUL:
kvBackend = kvstore.CONSUL
case ETCD:
kvBackend = kvstore.ETCD
kvBackend = libkvstore.CONSUL
case ETCDV2:
kvBackend = libkvstore.ETCD
case ETCDV3:
default:
return nil, fmt.Errorf("Unknown store backend: %q", cfg.Backend)
}
@ -99,7 +131,7 @@ func NewStore(cfg Config) (kvstore.Store, error) {
switch cfg.Backend {
case CONSUL:
endpointsStr = DefaultConsulEndpoints
case ETCD:
case ETCDV2, ETCDV3:
endpointsStr = DefaultEtcdEndpoints
}
}
@ -147,59 +179,108 @@ func NewStore(cfg Config) (kvstore.Store, error) {
}
}
config := &kvstore.Config{
TLS: tlsConfig,
ConnectionTimeout: 10 * time.Second,
}
switch cfg.Backend {
case CONSUL, ETCDV2:
config := &libkvstore.Config{
TLS: tlsConfig,
ConnectionTimeout: cluster.DefaultStoreTimeout,
}
store, err := libkv.NewStore(kvBackend, addrs, config)
if err != nil {
return nil, err
store, err := libkv.NewStore(kvBackend, addrs, config)
if err != nil {
return nil, err
}
return &libKVStore{store: store}, nil
case ETCDV3:
config := etcdclientv3.Config{
Endpoints: addrs,
TLS: tlsConfig,
}
c, err := etcdclientv3.New(config)
if err != nil {
return nil, err
}
return &etcdV3Store{c: c, requestTimeout: cluster.DefaultStoreTimeout}, nil
default:
return nil, fmt.Errorf("Unknown store backend: %q", cfg.Backend)
}
return store, nil
}
func NewStoreManager(kvStore kvstore.Store, path string) *StoreManager {
return &StoreManager{
type Election interface {
// TODO(sgotti) this mimics the current docker/leadership API and the etcdv3
// implementations adapt to it. In future it could be replaced with a better
// api like the current one implemented by etcdclientv3/concurrency.
RunForElection() (<-chan bool, <-chan error)
Leader() (string, error)
Stop()
}
func NewElection(kvStore KVStore, path, candidateUID string) Election {
switch kvStore.(type) {
case *libKVStore:
s := kvStore.(*libKVStore)
electionPath := filepath.Join(path, common.SentinelLeaderKey)
candidate := leadership.NewCandidate(s.store, electionPath, candidateUID, MinTTL)
return &libkvElection{store: s, path: electionPath, candidate: candidate}
case *etcdV3Store:
etcdV3Store := kvStore.(*etcdV3Store)
return &etcdv3Election{
c: etcdV3Store.c,
path: path,
candidateUID: candidateUID,
ttl: MinTTL,
}
default:
panic("unknown kvstore")
}
}
type Store struct {
clusterPath string
store KVStore
}
func NewStore(kvStore KVStore, path string) *Store {
return &Store{
clusterPath: path,
store: kvStore,
}
}
func (e *StoreManager) AtomicPutClusterData(cd *cluster.ClusterData, previous *kvstore.KVPair) (*kvstore.KVPair, error) {
func (s *Store) AtomicPutClusterData(ctx context.Context, cd *cluster.ClusterData, previous *KVPair) (*KVPair, error) {
cdj, err := json.Marshal(cd)
if err != nil {
return nil, err
}
path := filepath.Join(e.clusterPath, clusterDataFile)
path := filepath.Join(s.clusterPath, clusterDataFile)
// Skip prev Value since LastIndex is enough for a CAS and it gives
// problem with etcd v2 api with big prev values.
var prev *kvstore.KVPair
var prev *KVPair
if previous != nil {
prev = &kvstore.KVPair{
prev = &KVPair{
Key: previous.Key,
LastIndex: previous.LastIndex,
}
}
_, pair, err := e.store.AtomicPut(path, cdj, prev, nil)
return pair, err
return s.store.AtomicPut(ctx, path, cdj, prev, nil)
}
func (e *StoreManager) PutClusterData(cd *cluster.ClusterData) error {
func (s *Store) PutClusterData(ctx context.Context, cd *cluster.ClusterData) error {
cdj, err := json.Marshal(cd)
if err != nil {
return err
}
path := filepath.Join(e.clusterPath, clusterDataFile)
return e.store.Put(path, cdj, nil)
path := filepath.Join(s.clusterPath, clusterDataFile)
return s.store.Put(ctx, path, cdj, nil)
}
func (e *StoreManager) GetClusterData() (*cluster.ClusterData, *kvstore.KVPair, error) {
func (s *Store) GetClusterData(ctx context.Context) (*cluster.ClusterData, *KVPair, error) {
var cd *cluster.ClusterData
path := filepath.Join(e.clusterPath, clusterDataFile)
pair, err := e.store.Get(path)
path := filepath.Join(s.clusterPath, clusterDataFile)
pair, err := s.store.Get(ctx, path)
if err != nil {
if err != kvstore.ErrKeyNotFound {
if err != ErrKeyNotFound {
return nil, nil, err
}
return nil, nil, nil
@ -210,7 +291,7 @@ func (e *StoreManager) GetClusterData() (*cluster.ClusterData, *kvstore.KVPair,
return cd, pair, nil
}
func (e *StoreManager) SetKeeperInfo(id string, ms *cluster.KeeperInfo, ttl time.Duration) error {
func (s *Store) SetKeeperInfo(ctx context.Context, id string, ms *cluster.KeeperInfo, ttl time.Duration) error {
msj, err := json.Marshal(ms)
if err != nil {
return err
@ -218,17 +299,17 @@ func (e *StoreManager) SetKeeperInfo(id string, ms *cluster.KeeperInfo, ttl time
if ttl < MinTTL {
ttl = MinTTL
}
return e.store.Put(filepath.Join(e.clusterPath, keepersInfoDir, id), msj, &kvstore.WriteOptions{TTL: ttl})
return s.store.Put(ctx, filepath.Join(s.clusterPath, keepersInfoDir, id), msj, &WriteOptions{TTL: ttl})
}
func (e *StoreManager) GetKeeperInfo(id string) (*cluster.KeeperInfo, bool, error) {
func (s *Store) GetKeeperInfo(ctx context.Context, id string) (*cluster.KeeperInfo, bool, error) {
if id == "" {
return nil, false, fmt.Errorf("empty keeper id")
}
var keeper cluster.KeeperInfo
pair, err := e.store.Get(filepath.Join(e.clusterPath, keepersInfoDir, id))
pair, err := s.store.Get(ctx, filepath.Join(s.clusterPath, keepersInfoDir, id))
if err != nil {
if err != kvstore.ErrKeyNotFound {
if err != ErrKeyNotFound {
return nil, false, err
}
return nil, false, nil
@ -239,11 +320,11 @@ func (e *StoreManager) GetKeeperInfo(id string) (*cluster.KeeperInfo, bool, erro
return &keeper, true, nil
}
func (e *StoreManager) GetKeepersInfo() (cluster.KeepersInfo, error) {
func (s *Store) GetKeepersInfo(ctx context.Context) (cluster.KeepersInfo, error) {
keepers := cluster.KeepersInfo{}
pairs, err := e.store.List(filepath.Join(e.clusterPath, keepersInfoDir))
pairs, err := s.store.List(ctx, filepath.Join(s.clusterPath, keepersInfoDir))
if err != nil {
if err != kvstore.ErrKeyNotFound {
if err != ErrKeyNotFound {
return nil, err
}
return keepers, nil
@ -259,7 +340,7 @@ func (e *StoreManager) GetKeepersInfo() (cluster.KeepersInfo, error) {
return keepers, nil
}
func (e *StoreManager) SetSentinelInfo(si *cluster.SentinelInfo, ttl time.Duration) error {
func (s *Store) SetSentinelInfo(ctx context.Context, si *cluster.SentinelInfo, ttl time.Duration) error {
sij, err := json.Marshal(si)
if err != nil {
return err
@ -267,17 +348,17 @@ func (e *StoreManager) SetSentinelInfo(si *cluster.SentinelInfo, ttl time.Durati
if ttl < MinTTL {
ttl = MinTTL
}
return e.store.Put(filepath.Join(e.clusterPath, sentinelsInfoDir, si.UID), sij, &kvstore.WriteOptions{TTL: ttl})
return s.store.Put(ctx, filepath.Join(s.clusterPath, sentinelsInfoDir, si.UID), sij, &WriteOptions{TTL: ttl})
}
func (e *StoreManager) GetSentinelInfo(id string) (*cluster.SentinelInfo, bool, error) {
func (s *Store) GetSentinelInfo(ctx context.Context, id string) (*cluster.SentinelInfo, bool, error) {
if id == "" {
return nil, false, fmt.Errorf("empty sentinel id")
}
var si cluster.SentinelInfo
pair, err := e.store.Get(filepath.Join(e.clusterPath, sentinelsInfoDir, id))
pair, err := s.store.Get(ctx, filepath.Join(s.clusterPath, sentinelsInfoDir, id))
if err != nil {
if err != kvstore.ErrKeyNotFound {
if err != ErrKeyNotFound {
return nil, false, err
}
return nil, false, nil
@ -289,11 +370,11 @@ func (e *StoreManager) GetSentinelInfo(id string) (*cluster.SentinelInfo, bool,
return &si, true, nil
}
func (e *StoreManager) GetSentinelsInfo() (cluster.SentinelsInfo, error) {
func (s *Store) GetSentinelsInfo(ctx context.Context) (cluster.SentinelsInfo, error) {
ssi := cluster.SentinelsInfo{}
pairs, err := e.store.List(filepath.Join(e.clusterPath, sentinelsInfoDir))
pairs, err := s.store.List(ctx, filepath.Join(s.clusterPath, sentinelsInfoDir))
if err != nil {
if err != kvstore.ErrKeyNotFound {
if err != ErrKeyNotFound {
return nil, err
}
return ssi, nil
@ -309,10 +390,10 @@ func (e *StoreManager) GetSentinelsInfo() (cluster.SentinelsInfo, error) {
return ssi, nil
}
func (e *StoreManager) GetLeaderSentinelId() (string, error) {
pair, err := e.store.Get(filepath.Join(e.clusterPath, sentinelLeaderKey))
func (s *Store) GetLeaderSentinelId(ctx context.Context) (string, error) {
pair, err := s.store.Get(ctx, filepath.Join(s.clusterPath, sentinelLeaderKey))
if err != nil {
if err != kvstore.ErrKeyNotFound {
if err != ErrKeyNotFound {
return "", err
}
return "", nil
@ -320,7 +401,7 @@ func (e *StoreManager) GetLeaderSentinelId() (string, error) {
return string(pair.Value), nil
}
func (e *StoreManager) SetProxyInfo(pi *cluster.ProxyInfo, ttl time.Duration) error {
func (s *Store) SetProxyInfo(ctx context.Context, pi *cluster.ProxyInfo, ttl time.Duration) error {
pij, err := json.Marshal(pi)
if err != nil {
return err
@ -328,17 +409,17 @@ func (e *StoreManager) SetProxyInfo(pi *cluster.ProxyInfo, ttl time.Duration) er
if ttl < MinTTL {
ttl = MinTTL
}
return e.store.Put(filepath.Join(e.clusterPath, proxiesInfoDir, pi.UID), pij, &kvstore.WriteOptions{TTL: ttl})
return s.store.Put(ctx, filepath.Join(s.clusterPath, proxiesInfoDir, pi.UID), pij, &WriteOptions{TTL: ttl})
}
func (e *StoreManager) GetProxyInfo(id string) (*cluster.ProxyInfo, bool, error) {
func (s *Store) GetProxyInfo(ctx context.Context, id string) (*cluster.ProxyInfo, bool, error) {
if id == "" {
return nil, false, fmt.Errorf("empty proxy id")
}
var pi cluster.ProxyInfo
pair, err := e.store.Get(filepath.Join(e.clusterPath, proxiesInfoDir, id))
pair, err := s.store.Get(ctx, filepath.Join(s.clusterPath, proxiesInfoDir, id))
if err != nil {
if err != kvstore.ErrKeyNotFound {
if err != ErrKeyNotFound {
return nil, false, err
}
return nil, false, nil
@ -350,11 +431,11 @@ func (e *StoreManager) GetProxyInfo(id string) (*cluster.ProxyInfo, bool, error)
return &pi, true, nil
}
func (e *StoreManager) GetProxiesInfo() (cluster.ProxiesInfo, error) {
func (s *Store) GetProxiesInfo(ctx context.Context) (cluster.ProxiesInfo, error) {
psi := cluster.ProxiesInfo{}
pairs, err := e.store.List(filepath.Join(e.clusterPath, proxiesInfoDir))
pairs, err := s.store.List(ctx, filepath.Join(s.clusterPath, proxiesInfoDir))
if err != nil {
if err != kvstore.ErrKeyNotFound {
if err != ErrKeyNotFound {
return nil, err
}
return psi, nil

View File

@ -12,8 +12,8 @@ fi
# Install etcd
mkdir etcd
pushd etcd
curl -L https://github.com/coreos/etcd/releases/download/v3.1.8/etcd-v3.1.8-linux-amd64.tar.gz -o etcd-v3.1.8-linux-amd64.tar.gz
tar xzvf etcd-v3.1.8-linux-amd64.tar.gz
curl -L https://github.com/coreos/etcd/releases/download/v3.2.11/etcd-v3.2.11-linux-amd64.tar.gz -o etcd-v3.2.11-linux-amd64.tar.gz
tar xzvf etcd-v3.2.11-linux-amd64.tar.gz
popd
# Install consul
@ -34,7 +34,7 @@ sudo apt-get -y install postgresql-9.5 postgresql-9.6 postgresql-10
sudo -E CGO_ENABLED=0 go install -a -installsuffix cgo std
# Run tests
export ETCD_BIN="${PWD}/etcd/etcd-v3.1.8-linux-amd64/etcd"
export ETCD_BIN="${PWD}/etcd/etcd-v3.2.11-linux-amd64/etcd"
export CONSUL_BIN="${PWD}/consul/consul"
OLDPATH=$PATH

2
test
View File

@ -97,7 +97,7 @@ if [ -n "$INTEGRATION" ]; then
export STSENTINEL_BIN=${BINDIR}/stolon-sentinel
export STPROXY_BIN=${BINDIR}/stolon-proxy
export STCTL_BIN=${BINDIR}/stolonctl
if [ "${STOLON_TEST_STORE_BACKEND}" == "etcd" ]; then
if [ "${STOLON_TEST_STORE_BACKEND}" == "etcd" -o "${STOLON_TEST_STORE_BACKEND}" == "etcdv2" -o "${STOLON_TEST_STORE_BACKEND}" == "etcdv3" ]; then
if [ -z ${ETCD_BIN} ]; then
if [ -z $(which etcd) ]; then
echo "cannot find etcd in PATH and ETCD_BIN environment variable not defined"

View File

@ -55,7 +55,7 @@ func TestServerParameters(t *testing.T) {
storePath := filepath.Join(common.StoreBasePath, clusterName)
sm := store.NewStoreManager(tstore.store, storePath)
sm := store.NewStore(tstore.store, storePath)
initialClusterSpec := &cluster.ClusterSpec{
InitMode: cluster.ClusterInitModeP(cluster.ClusterInitModeNew),

View File

@ -15,6 +15,7 @@
package integration
import (
"context"
"fmt"
"io/ioutil"
"os"
@ -70,7 +71,7 @@ func TestInitWithMultipleKeepers(t *testing.T) {
storePath := filepath.Join(common.StoreBasePath, clusterName)
sm := store.NewStoreManager(tstore.store, storePath)
sm := store.NewStore(tstore.store, storePath)
initialClusterSpec := &cluster.ClusterSpec{
InitMode: cluster.ClusterInitModeP(cluster.ClusterInitModeNew),
@ -271,7 +272,7 @@ func shutdown(tks map[string]*TestKeeper, tss map[string]*TestSentinel, tp *Test
}
}
func waitKeeperReady(t *testing.T, sm *store.StoreManager, keeper *TestKeeper) {
func waitKeeperReady(t *testing.T, sm *store.Store, keeper *TestKeeper) {
if err := WaitClusterDataKeeperInitialized(keeper.uid, sm, 60*time.Second); err != nil {
t.Fatalf("unexpected err: %v", err)
}
@ -280,7 +281,7 @@ func waitKeeperReady(t *testing.T, sm *store.StoreManager, keeper *TestKeeper) {
}
}
func waitMasterStandbysReady(t *testing.T, sm *store.StoreManager, tks testKeepers) (master *TestKeeper, standbys []*TestKeeper) {
func waitMasterStandbysReady(t *testing.T, sm *store.Store, tks testKeepers) (master *TestKeeper, standbys []*TestKeeper) {
// Wait for normal cluster phase (master ready)
masterUID, err := WaitClusterDataWithMaster(sm, 60*time.Second)
if err != nil {
@ -314,7 +315,7 @@ func testMasterStandby(t *testing.T, syncRepl bool) {
defer shutdown(tks, tss, tp, tstore)
storePath := filepath.Join(common.StoreBasePath, clusterName)
sm := store.NewStoreManager(tstore.store, storePath)
sm := store.NewStore(tstore.store, storePath)
master, standbys := waitMasterStandbysReady(t, sm, tks)
standby := standbys[0]
@ -378,7 +379,7 @@ func testFailover(t *testing.T, syncRepl bool, standbyCluster bool) {
defer shutdown(tks, tss, tp, tstore)
storePath := filepath.Join(common.StoreBasePath, clusterName)
sm := store.NewStoreManager(tstore.store, storePath)
sm := store.NewStore(tstore.store, storePath)
master, standbys := waitMasterStandbysReady(t, sm, tks)
standby := standbys[0]
@ -486,7 +487,7 @@ func testFailoverFailed(t *testing.T, syncRepl bool, standbyCluster bool) {
defer shutdown(tks, tss, tp, tstore)
storePath := filepath.Join(common.StoreBasePath, clusterName)
sm := store.NewStoreManager(tstore.store, storePath)
sm := store.NewStore(tstore.store, storePath)
master, standbys := waitMasterStandbysReady(t, sm, tks)
standby := standbys[0]
@ -585,7 +586,7 @@ func testFailoverTooMuchLag(t *testing.T, standbyCluster bool) {
defer shutdown(tks, tss, tp, tstore)
storePath := filepath.Join(common.StoreBasePath, clusterName)
sm := store.NewStoreManager(tstore.store, storePath)
sm := store.NewStore(tstore.store, storePath)
master, standbys := waitMasterStandbysReady(t, sm, tks)
standby := standbys[0]
@ -663,7 +664,7 @@ func testOldMasterRestart(t *testing.T, syncRepl, usePgrewind bool, standbyClust
storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port)
storePath := filepath.Join(common.StoreBasePath, clusterName)
sm := store.NewStoreManager(tstore.store, storePath)
sm := store.NewStore(tstore.store, storePath)
master, standbys := waitMasterStandbysReady(t, sm, tks)
@ -796,7 +797,7 @@ func testPartition1(t *testing.T, syncRepl, usePgrewind bool, standbyCluster boo
storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port)
storePath := filepath.Join(common.StoreBasePath, clusterName)
sm := store.NewStoreManager(tstore.store, storePath)
sm := store.NewStore(tstore.store, storePath)
master, standbys := waitMasterStandbysReady(t, sm, tks)
@ -932,7 +933,7 @@ func testTimelineFork(t *testing.T, syncRepl, usePgrewind bool) {
storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port)
storePath := filepath.Join(common.StoreBasePath, clusterName)
sm := store.NewStoreManager(tstore.store, storePath)
sm := store.NewStore(tstore.store, storePath)
master, standbys := waitMasterStandbysReady(t, sm, tks)
@ -1111,7 +1112,7 @@ func testMasterChangedAddress(t *testing.T, standbyCluster bool) {
defer shutdown(tks, tss, tp, tstore)
storePath := filepath.Join(common.StoreBasePath, clusterName)
sm := store.NewStoreManager(tstore.store, storePath)
sm := store.NewStore(tstore.store, storePath)
master, standbys := waitMasterStandbysReady(t, sm, tks)
@ -1198,7 +1199,7 @@ func TestFailedStandby(t *testing.T) {
defer shutdown(tks, tss, tp, tstore)
storePath := filepath.Join(common.StoreBasePath, clusterName)
sm := store.NewStoreManager(tstore.store, storePath)
sm := store.NewStore(tstore.store, storePath)
// Wait for clusterView containing a master
masterUID, err := WaitClusterDataWithMaster(sm, 30*time.Second)
@ -1226,7 +1227,7 @@ func TestFailedStandby(t *testing.T) {
t.Fatalf("expected 2 DBs in cluster data: %v", err)
}
cd, _, err := sm.GetClusterData()
cd, _, err := sm.GetClusterData(context.TODO())
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
@ -1290,7 +1291,7 @@ func TestLoweredMaxStandbysPerSender(t *testing.T) {
storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port)
storePath := filepath.Join(common.StoreBasePath, clusterName)
sm := store.NewStoreManager(tstore.store, storePath)
sm := store.NewStore(tstore.store, storePath)
// Wait for clusterView containing a master
masterUID, err := WaitClusterDataWithMaster(sm, 30*time.Second)
@ -1357,7 +1358,7 @@ func TestKeeperRemoval(t *testing.T) {
storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port)
storePath := filepath.Join(common.StoreBasePath, clusterName)
sm := store.NewStoreManager(tstore.store, storePath)
sm := store.NewStore(tstore.store, storePath)
master, standbys := waitMasterStandbysReady(t, sm, tks)
standby1 := standbys[0]
@ -1460,7 +1461,7 @@ func testKeeperRemovalStolonCtl(t *testing.T, syncRepl bool) {
storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port)
storePath := filepath.Join(common.StoreBasePath, clusterName)
sm := store.NewStoreManager(tstore.store, storePath)
sm := store.NewStore(tstore.store, storePath)
master, standbys := waitMasterStandbysReady(t, sm, tks)
@ -1505,7 +1506,7 @@ func testKeeperRemovalStolonCtl(t *testing.T, syncRepl bool) {
}
// get current stanbdys[0] db uid
cd, _, err := sm.GetClusterData()
cd, _, err := sm.GetClusterData(context.TODO())
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
@ -1566,7 +1567,7 @@ func TestStandbyCantSync(t *testing.T) {
defer shutdown(tks, tss, tp, tstore)
storePath := filepath.Join(common.StoreBasePath, clusterName)
sm := store.NewStoreManager(tstore.store, storePath)
sm := store.NewStore(tstore.store, storePath)
master, standbys := waitMasterStandbysReady(t, sm, tks)
@ -1616,7 +1617,7 @@ func TestStandbyCantSync(t *testing.T) {
}
// get current stanbdys[0] db uid
cd, _, err := sm.GetClusterData()
cd, _, err := sm.GetClusterData(context.TODO())
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
@ -1645,7 +1646,7 @@ func TestStandbyCantSync(t *testing.T) {
// check that the current stanbdys[0] db uid is different. This means the
// sentinel found that standbys[0] won't sync due to missing wals and asked
// the keeper to resync (defining e new db in the cluster data)
cd, _, err = sm.GetClusterData()
cd, _, err = sm.GetClusterData(context.TODO())
if err != nil {
t.Fatalf("unexpected err: %v", err)
}

View File

@ -15,6 +15,7 @@
package integration
import (
"context"
"fmt"
"io/ioutil"
"os"
@ -106,7 +107,7 @@ func testInitNew(t *testing.T, merge bool) {
storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port)
storePath := filepath.Join(common.StoreBasePath, clusterName)
sm := store.NewStoreManager(tstore.store, storePath)
sm := store.NewStore(tstore.store, storePath)
initialClusterSpec := &cluster.ClusterSpec{
InitMode: cluster.ClusterInitModeP(cluster.ClusterInitModeNew),
@ -144,7 +145,7 @@ func testInitNew(t *testing.T, merge bool) {
t.Fatalf("unexpected err: %v", err)
}
cd, _, err := sm.GetClusterData()
cd, _, err := sm.GetClusterData(context.TODO())
// max_connection should be set by initdb
_, ok := cd.Cluster.Spec.PGParameters["max_connections"]
if merge && !ok {
@ -182,7 +183,7 @@ func testInitExisting(t *testing.T, merge bool) {
storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port)
storePath := filepath.Join(common.StoreBasePath, clusterName)
sm := store.NewStoreManager(tstore.store, storePath)
sm := store.NewStore(tstore.store, storePath)
initialClusterSpec := &cluster.ClusterSpec{
InitMode: cluster.ClusterInitModeP(cluster.ClusterInitModeNew),
@ -283,7 +284,7 @@ func testInitExisting(t *testing.T, merge bool) {
t.Fatalf("expected archive_mode empty")
}
cd, _, err := sm.GetClusterData()
cd, _, err := sm.GetClusterData(context.TODO())
// max_connection should be set by initdb
v, ok = cd.Cluster.Spec.PGParameters["archive_mode"]
if merge && v != "on" {
@ -328,7 +329,7 @@ func TestInitUsers(t *testing.T) {
clusterName = uuid.NewV4().String()
storePath := filepath.Join(common.StoreBasePath, clusterName)
sm := store.NewStoreManager(tstore.store, storePath)
sm := store.NewStore(tstore.store, storePath)
initialClusterSpec := &cluster.ClusterSpec{
InitMode: cluster.ClusterInitModeP(cluster.ClusterInitModeNew),
@ -371,7 +372,7 @@ func TestInitUsers(t *testing.T) {
clusterName = uuid.NewV4().String()
storePath = filepath.Join(common.StoreBasePath, clusterName)
sm = store.NewStoreManager(tstore.store, storePath)
sm = store.NewStore(tstore.store, storePath)
ts2, err := NewTestSentinel(t, dir, clusterName, tstore.storeBackend, storeEndpoints, fmt.Sprintf("--initial-cluster-spec=%s", initialClusterSpecFile))
if err != nil {
@ -419,7 +420,7 @@ func TestInitialClusterSpec(t *testing.T) {
storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port)
storePath := filepath.Join(common.StoreBasePath, clusterName)
sm := store.NewStoreManager(tstore.store, storePath)
sm := store.NewStore(tstore.store, storePath)
initialClusterSpec := &cluster.ClusterSpec{
InitMode: cluster.ClusterInitModeP(cluster.ClusterInitModeNew),
@ -447,7 +448,7 @@ func TestInitialClusterSpec(t *testing.T) {
t.Fatal("expected cluster in initializing phase")
}
cd, _, err := sm.GetClusterData()
cd, _, err := sm.GetClusterData(context.TODO())
if err != nil {
t.Fatalf("unexpected err: %v", err)
}

View File

@ -15,6 +15,7 @@
package integration
import (
"context"
"fmt"
"io/ioutil"
"os"
@ -56,7 +57,7 @@ func TestPITR(t *testing.T) {
storePath := filepath.Join(common.StoreBasePath, clusterName)
sm := store.NewStoreManager(tstore.store, storePath)
sm := store.NewStore(tstore.store, storePath)
initialClusterSpec := &cluster.ClusterSpec{
InitMode: cluster.ClusterInitModeP(cluster.ClusterInitModeNew),
@ -130,11 +131,11 @@ func TestPITR(t *testing.T) {
ts.Stop()
// Delete the current cluster data
if err := tstore.store.Delete(filepath.Join(storePath, "clusterdata")); err != nil {
if err := tstore.store.Delete(context.TODO(), filepath.Join(storePath, "clusterdata")); err != nil {
t.Fatalf("unexpected err: %v", err)
}
// Delete sentinel leader key to just speedup new election
if err := tstore.store.Delete(filepath.Join(storePath, common.SentinelLeaderKey)); err != nil {
if err := tstore.store.Delete(context.TODO(), filepath.Join(storePath, common.SentinelLeaderKey)); err != nil && err != store.ErrKeyNotFound {
t.Fatalf("unexpected err: %v", err)
}

View File

@ -15,6 +15,7 @@
package integration
import (
"context"
"fmt"
"io/ioutil"
"os"
@ -78,7 +79,7 @@ func TestProxyListening(t *testing.T) {
storePath := filepath.Join(common.StoreBasePath, clusterName)
sm := store.NewStoreManager(tstore.store, storePath)
sm := store.NewStore(tstore.store, storePath)
cd := &cluster.ClusterData{
FormatVersion: cluster.CurrentCDFormatVersion,
@ -126,7 +127,7 @@ func TestProxyListening(t *testing.T) {
},
},
}
pair, err := sm.AtomicPutClusterData(cd, nil)
pair, err := sm.AtomicPutClusterData(context.TODO(), cd, nil)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
@ -196,7 +197,7 @@ func TestProxyListening(t *testing.T) {
t.Logf("test proxyConf removed. Should continue listening")
// remove proxyConf
cd.Proxy.Spec.MasterDBUID = ""
pair, err = sm.AtomicPutClusterData(cd, pair)
pair, err = sm.AtomicPutClusterData(context.TODO(), cd, pair)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
@ -209,7 +210,7 @@ func TestProxyListening(t *testing.T) {
t.Logf("test proxyConf restored. Should continue listening")
// Set proxyConf again
cd.Proxy.Spec.MasterDBUID = "01"
pair, err = sm.AtomicPutClusterData(cd, pair)
pair, err = sm.AtomicPutClusterData(context.TODO(), cd, pair)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
@ -221,7 +222,7 @@ func TestProxyListening(t *testing.T) {
t.Logf("test clusterView removed. Should continue listening")
// remove whole clusterview
_, err = sm.AtomicPutClusterData(nil, pair)
_, err = sm.AtomicPutClusterData(context.TODO(), nil, pair)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}

View File

@ -46,7 +46,7 @@ func TestInitStandbyCluster(t *testing.T) {
primaryStoreEndpoints := fmt.Sprintf("%s:%s", ptstore.listenAddress, ptstore.port)
pStorePath := filepath.Join(common.StoreBasePath, primaryClusterName)
psm := store.NewStoreManager(ptstore.store, pStorePath)
psm := store.NewStore(ptstore.store, pStorePath)
initialClusterSpec := &cluster.ClusterSpec{
InitMode: cluster.ClusterInitModeP(cluster.ClusterInitModeNew),
@ -95,7 +95,7 @@ func TestInitStandbyCluster(t *testing.T) {
storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port)
storePath := filepath.Join(common.StoreBasePath, clusterName)
sm := store.NewStoreManager(tstore.store, storePath)
sm := store.NewStore(tstore.store, storePath)
pgpass, err := ioutil.TempFile(dir, "pgpass")
if err != nil {
@ -175,7 +175,7 @@ func TestPromoteStandbyCluster(t *testing.T) {
primaryStoreEndpoints := fmt.Sprintf("%s:%s", ptstore.listenAddress, ptstore.port)
pStorePath := filepath.Join(common.StoreBasePath, primaryClusterName)
psm := store.NewStoreManager(ptstore.store, pStorePath)
psm := store.NewStore(ptstore.store, pStorePath)
initialClusterSpec := &cluster.ClusterSpec{
InitMode: cluster.ClusterInitModeP(cluster.ClusterInitModeNew),
@ -224,7 +224,7 @@ func TestPromoteStandbyCluster(t *testing.T) {
storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port)
storePath := filepath.Join(common.StoreBasePath, clusterName)
sm := store.NewStoreManager(tstore.store, storePath)
sm := store.NewStore(tstore.store, storePath)
pgpass, err := ioutil.TempFile(dir, "pgpass")
if err != nil {

View File

@ -16,6 +16,7 @@ package integration
import (
"bufio"
"context"
"database/sql"
"encoding/json"
"fmt"
@ -38,7 +39,6 @@ import (
"github.com/sorintlab/stolon/pkg/store"
"github.com/sorintlab/stolon/pkg/util"
kvstore "github.com/docker/libkv/store"
_ "github.com/lib/pq"
"github.com/satori/go.uuid"
"github.com/sgotti/gexpect"
@ -693,22 +693,25 @@ type TestStore struct {
Process
listenAddress string
port string
store kvstore.Store
store store.KVStore
storeBackend store.Backend
}
func NewTestStore(t *testing.T, dir string, a ...string) (*TestStore, error) {
storeBackend := store.Backend(os.Getenv("STOLON_TEST_STORE_BACKEND"))
switch storeBackend {
case store.CONSUL:
case "consul":
return NewTestConsul(t, dir, a...)
case store.ETCD:
return NewTestEtcd(t, dir, a...)
case "etcd":
storeBackend = "etcdv2"
fallthrough
case "etcdv2", "etcdv3":
return NewTestEtcd(t, dir, storeBackend, a...)
}
return nil, fmt.Errorf("wrong store backend")
}
func NewTestEtcd(t *testing.T, dir string, a ...string) (*TestStore, error) {
func NewTestEtcd(t *testing.T, dir string, backend store.Backend, a ...string) (*TestStore, error) {
u := uuid.NewV4()
uid := fmt.Sprintf("%x", u[:4])
@ -736,10 +739,10 @@ func NewTestEtcd(t *testing.T, dir string, a ...string) (*TestStore, error) {
storeEndpoints := fmt.Sprintf("%s:%s", listenAddress, port)
storeConfig := store.Config{
Backend: store.ETCD,
Backend: store.Backend(backend),
Endpoints: storeEndpoints,
}
kvstore, err := store.NewStore(storeConfig)
kvstore, err := store.NewKVStore(storeConfig)
if err != nil {
return nil, fmt.Errorf("cannot create store: %v", err)
}
@ -760,7 +763,7 @@ func NewTestEtcd(t *testing.T, dir string, a ...string) (*TestStore, error) {
listenAddress: listenAddress,
port: port,
store: kvstore,
storeBackend: store.ETCD,
storeBackend: backend,
}
return tstore, nil
}
@ -825,7 +828,7 @@ func NewTestConsul(t *testing.T, dir string, a ...string) (*TestStore, error) {
Backend: store.CONSUL,
Endpoints: storeEndpoints,
}
kvstore, err := store.NewStore(storeConfig)
kvstore, err := store.NewKVStore(storeConfig)
if err != nil {
return nil, fmt.Errorf("cannot create store: %v", err)
}
@ -854,9 +857,9 @@ func NewTestConsul(t *testing.T, dir string, a ...string) (*TestStore, error) {
func (ts *TestStore) WaitUp(timeout time.Duration) error {
start := time.Now()
for time.Now().Add(-timeout).Before(start) {
_, err := ts.store.Get("anykey")
_, err := ts.store.Get(context.TODO(), "anykey")
ts.t.Logf("err: %v", err)
if err != nil && err == kvstore.ErrKeyNotFound {
if err != nil && err == store.ErrKeyNotFound {
return nil
}
if err == nil {
@ -871,8 +874,8 @@ func (ts *TestStore) WaitUp(timeout time.Duration) error {
func (ts *TestStore) WaitDown(timeout time.Duration) error {
start := time.Now()
for time.Now().Add(-timeout).Before(start) {
_, err := ts.store.Get("anykey")
if err != nil && err != kvstore.ErrKeyNotFound {
_, err := ts.store.Get(context.TODO(), "anykey")
if err != nil && err != store.ErrKeyNotFound {
return nil
}
time.Sleep(sleepInterval)
@ -881,10 +884,10 @@ func (ts *TestStore) WaitDown(timeout time.Duration) error {
return fmt.Errorf("timeout")
}
func WaitClusterDataWithMaster(e *store.StoreManager, timeout time.Duration) (string, error) {
func WaitClusterDataWithMaster(e *store.Store, timeout time.Duration) (string, error) {
start := time.Now()
for time.Now().Add(-timeout).Before(start) {
cd, _, err := e.GetClusterData()
cd, _, err := e.GetClusterData(context.TODO())
if err != nil || cd == nil {
goto end
}
@ -897,10 +900,10 @@ func WaitClusterDataWithMaster(e *store.StoreManager, timeout time.Duration) (st
return "", fmt.Errorf("timeout")
}
func WaitClusterDataMaster(master string, e *store.StoreManager, timeout time.Duration) error {
func WaitClusterDataMaster(master string, e *store.Store, timeout time.Duration) error {
start := time.Now()
for time.Now().Add(-timeout).Before(start) {
cd, _, err := e.GetClusterData()
cd, _, err := e.GetClusterData(context.TODO())
if err != nil || cd == nil {
goto end
}
@ -915,10 +918,10 @@ func WaitClusterDataMaster(master string, e *store.StoreManager, timeout time.Du
return fmt.Errorf("timeout")
}
func WaitClusterDataKeeperInitialized(keeperUID string, e *store.StoreManager, timeout time.Duration) error {
func WaitClusterDataKeeperInitialized(keeperUID string, e *store.Store, timeout time.Duration) error {
start := time.Now()
for time.Now().Add(-timeout).Before(start) {
cd, _, err := e.GetClusterData()
cd, _, err := e.GetClusterData(context.TODO())
if err != nil || cd == nil {
goto end
}
@ -939,11 +942,11 @@ func WaitClusterDataKeeperInitialized(keeperUID string, e *store.StoreManager, t
// WaitClusterDataSynchronousStandbys waits for:
// * synchronous standby defined in masterdb spec
// * synchronous standby reported from masterdb status
func WaitClusterDataSynchronousStandbys(synchronousStandbys []string, e *store.StoreManager, timeout time.Duration) error {
func WaitClusterDataSynchronousStandbys(synchronousStandbys []string, e *store.Store, timeout time.Duration) error {
sort.Sort(sort.StringSlice(synchronousStandbys))
start := time.Now()
for time.Now().Add(-timeout).Before(start) {
cd, _, err := e.GetClusterData()
cd, _, err := e.GetClusterData(context.TODO())
if err != nil || cd == nil {
goto end
}
@ -982,10 +985,10 @@ func WaitClusterDataSynchronousStandbys(synchronousStandbys []string, e *store.S
return fmt.Errorf("timeout")
}
func WaitClusterPhase(e *store.StoreManager, phase cluster.ClusterPhase, timeout time.Duration) error {
func WaitClusterPhase(e *store.Store, phase cluster.ClusterPhase, timeout time.Duration) error {
start := time.Now()
for time.Now().Add(-timeout).Before(start) {
cd, _, err := e.GetClusterData()
cd, _, err := e.GetClusterData(context.TODO())
if err != nil || cd == nil {
goto end
}
@ -998,10 +1001,10 @@ func WaitClusterPhase(e *store.StoreManager, phase cluster.ClusterPhase, timeout
return fmt.Errorf("timeout")
}
func WaitNumDBs(e *store.StoreManager, n int, timeout time.Duration) error {
func WaitNumDBs(e *store.Store, n int, timeout time.Duration) error {
start := time.Now()
for time.Now().Add(-timeout).Before(start) {
cd, _, err := e.GetClusterData()
cd, _, err := e.GetClusterData(context.TODO())
if err != nil || cd == nil {
goto end
}
@ -1014,10 +1017,10 @@ func WaitNumDBs(e *store.StoreManager, n int, timeout time.Duration) error {
return fmt.Errorf("timeout")
}
func WaitStandbyKeeper(e *store.StoreManager, keeperUID string, timeout time.Duration) error {
func WaitStandbyKeeper(e *store.Store, keeperUID string, timeout time.Duration) error {
start := time.Now()
for time.Now().Add(-timeout).Before(start) {
cd, _, err := e.GetClusterData()
cd, _, err := e.GetClusterData(context.TODO())
if err != nil || cd == nil {
goto end
}
@ -1036,10 +1039,10 @@ func WaitStandbyKeeper(e *store.StoreManager, keeperUID string, timeout time.Dur
return fmt.Errorf("timeout")
}
func WaitClusterDataKeepers(keepersUIDs []string, e *store.StoreManager, timeout time.Duration) error {
func WaitClusterDataKeepers(keepersUIDs []string, e *store.Store, timeout time.Duration) error {
start := time.Now()
for time.Now().Add(-timeout).Before(start) {
cd, _, err := e.GetClusterData()
cd, _, err := e.GetClusterData(context.TODO())
if err != nil || cd == nil {
goto end
}
@ -1061,12 +1064,12 @@ func WaitClusterDataKeepers(keepersUIDs []string, e *store.StoreManager, timeout
// WaitClusterSyncedXLogPos waits for all the specified keepers to have the same
// reported XLogPos
func WaitClusterSyncedXLogPos(keepersUIDs []string, e *store.StoreManager, timeout time.Duration) error {
func WaitClusterSyncedXLogPos(keepersUIDs []string, e *store.Store, timeout time.Duration) error {
start := time.Now()
for time.Now().Add(-timeout).Before(start) {
c := 0
curXLogPos := uint64(0)
cd, _, err := e.GetClusterData()
cd, _, err := e.GetClusterData(context.TODO())
if err != nil || cd == nil {
goto end
}