tests: start etcd inside tests.

Instead of relying on an already running etcd, start it in the tests.
This commit is contained in:
Simone Gotti 2015-10-28 11:11:52 +01:00
parent a4cb5fa462
commit ea99cdf03a
5 changed files with 239 additions and 27 deletions

View File

@ -14,9 +14,8 @@ mkdir etcd
cd etcd cd etcd
curl -L https://github.com/coreos/etcd/releases/download/v2.2.1/etcd-v2.2.1-linux-amd64.tar.gz -o etcd-v2.2.1-linux-amd64.tar.gz curl -L https://github.com/coreos/etcd/releases/download/v2.2.1/etcd-v2.2.1-linux-amd64.tar.gz -o etcd-v2.2.1-linux-amd64.tar.gz
tar xzvf etcd-v2.2.1-linux-amd64.tar.gz tar xzvf etcd-v2.2.1-linux-amd64.tar.gz
cd etcd-v2.2.1-linux-amd64 cd ../
start-stop-daemon -b --start --exec $PWD/etcd -- --data-dir=$PWD/
cd ../../
# Run tests # Run tests
export ETCD_BIN="${PWD}/etcd/etcd-v2.2.1-linux-amd64/etcd"
export PATH=/usr/lib/postgresql/9.4/bin/:$PATH ; INTEGRATION=1 ./test export PATH=/usr/lib/postgresql/9.4/bin/:$PATH ; INTEGRATION=1 ./test

9
test
View File

@ -92,6 +92,15 @@ if [ -n "$INTEGRATION" ]; then
export STKEEPER_BIN=${BINDIR}/stolon-keeper export STKEEPER_BIN=${BINDIR}/stolon-keeper
export STSENTINEL_BIN=${BINDIR}/stolon-sentinel export STSENTINEL_BIN=${BINDIR}/stolon-sentinel
export STPROXY_BIN=${BINDIR}/stolon-proxy export STPROXY_BIN=${BINDIR}/stolon-proxy
if [ -z ${ETCD_BIN} ]; then
if [ -z $(which etcd) ]; then
echo "cannot find etcd in PATH and ETCD_BIN environment variable not defined"
exit 1
fi
ETCD_BIN=$(which etcd)
fi
echo "using etcd from $ETCD_BIN"
export ETCD_BIN
go test -timeout 20m $@ -v ${REPO_PATH}/tests/integration go test -timeout 20m $@ -v ${REPO_PATH}/tests/integration
fi fi

View File

@ -29,7 +29,20 @@ import (
etcdm "github.com/sorintlab/stolon/pkg/etcd" etcdm "github.com/sorintlab/stolon/pkg/etcd"
) )
func setupServers(t *testing.T, dir string, numKeepers, numSentinels uint8, syncRepl bool) ([]*TestKeeper, []*TestSentinel) { func setupServers(t *testing.T, dir string, numKeepers, numSentinels uint8, syncRepl bool) ([]*TestKeeper, []*TestSentinel, *TestEtcd) {
te, err := NewTestEtcd(dir)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
if err := te.Start(); err != nil {
t.Fatalf("unexpected err: %v", err)
}
if err := te.WaitUp(10 * time.Second); err != nil {
t.Fatalf("error waiting on etcd up: %v", err)
}
etcdEndpoints := fmt.Sprintf("http://%s:%s", te.listenAddress, te.port)
clusterName := uuid.NewV4().String() clusterName := uuid.NewV4().String()
etcdPath := filepath.Join(common.EtcdBasePath, clusterName) etcdPath := filepath.Join(common.EtcdBasePath, clusterName)
@ -52,7 +65,7 @@ func setupServers(t *testing.T, dir string, numKeepers, numSentinels uint8, sync
tks := []*TestKeeper{} tks := []*TestKeeper{}
tss := []*TestSentinel{} tss := []*TestSentinel{}
tk, err := NewTestKeeper(dir, clusterName) tk, err := NewTestKeeper(dir, clusterName, etcdEndpoints)
if err != nil { if err != nil {
t.Fatalf("unexpected err: %v", err) t.Fatalf("unexpected err: %v", err)
} }
@ -62,7 +75,7 @@ func setupServers(t *testing.T, dir string, numKeepers, numSentinels uint8, sync
// Start sentinels // Start sentinels
for i := uint8(0); i < numSentinels; i++ { for i := uint8(0); i < numSentinels; i++ {
ts, err := NewTestSentinel(dir, clusterName) ts, err := NewTestSentinel(dir, clusterName, etcdEndpoints)
if err != nil { if err != nil {
t.Fatalf("unexpected err: %v", err) t.Fatalf("unexpected err: %v", err)
} }
@ -83,7 +96,7 @@ func setupServers(t *testing.T, dir string, numKeepers, numSentinels uint8, sync
// Start standbys // Start standbys
for i := uint8(1); i < numKeepers; i++ { for i := uint8(1); i < numKeepers; i++ {
tk, err := NewTestKeeper(dir, clusterName) tk, err := NewTestKeeper(dir, clusterName, etcdEndpoints)
if err != nil { if err != nil {
t.Fatalf("unexpected err: %v", err) t.Fatalf("unexpected err: %v", err)
} }
@ -98,7 +111,7 @@ func setupServers(t *testing.T, dir string, numKeepers, numSentinels uint8, sync
} }
tks = append(tks, tk) tks = append(tks, tk)
} }
return tks, tss return tks, tss, te
} }
func populate(t *testing.T, tk *TestKeeper) error { func populate(t *testing.T, tk *TestKeeper) error {
@ -141,7 +154,7 @@ func waitLines(t *testing.T, tk *TestKeeper, num int, timeout time.Duration) err
} }
func shutdown(tks []*TestKeeper, tss []*TestSentinel) { func shutdown(tks []*TestKeeper, tss []*TestSentinel, te *TestEtcd) {
for _, ts := range tss { for _, ts := range tss {
if ts.cmd != nil { if ts.cmd != nil {
ts.Stop() ts.Stop()
@ -152,6 +165,9 @@ func shutdown(tks []*TestKeeper, tss []*TestSentinel) {
tk.Stop() tk.Stop()
} }
} }
if te.cmd != nil {
te.Stop()
}
} }
func getRoles(t *testing.T, tks []*TestKeeper) (master *TestKeeper, standbys []*TestKeeper, err error) { func getRoles(t *testing.T, tks []*TestKeeper) (master *TestKeeper, standbys []*TestKeeper, err error) {
@ -176,8 +192,8 @@ func testMasterStandby(t *testing.T, syncRepl bool) {
} }
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
tks, tss := setupServers(t, dir, 2, 1, syncRepl) tks, tss, te := setupServers(t, dir, 2, 1, syncRepl)
defer shutdown(tks, tss) defer shutdown(tks, tss, te)
master, standbys, err := getRoles(t, tks) master, standbys, err := getRoles(t, tks)
if err != nil { if err != nil {
@ -224,8 +240,8 @@ func testFailover(t *testing.T, syncRepl bool) {
} }
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
tks, tss := setupServers(t, dir, 2, 1, syncRepl) tks, tss, te := setupServers(t, dir, 2, 1, syncRepl)
defer shutdown(tks, tss) defer shutdown(tks, tss, te)
master, standbys, err := getRoles(t, tks) master, standbys, err := getRoles(t, tks)
if err != nil { if err != nil {
@ -270,8 +286,8 @@ func testOldMasterRestart(t *testing.T, syncRepl bool) {
} }
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
tks, tss := setupServers(t, dir, 2, 1, syncRepl) tks, tss, te := setupServers(t, dir, 2, 1, syncRepl)
defer shutdown(tks, tss) defer shutdown(tks, tss, te)
master, standbys, err := getRoles(t, tks) master, standbys, err := getRoles(t, tks)
if err != nil { if err != nil {
@ -337,8 +353,8 @@ func testPartition1(t *testing.T, syncRepl bool) {
} }
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
tks, tss := setupServers(t, dir, 2, 1, syncRepl) tks, tss, te := setupServers(t, dir, 2, 1, syncRepl)
defer shutdown(tks, tss) defer shutdown(tks, tss, te)
master, standbys, err := getRoles(t, tks) master, standbys, err := getRoles(t, tks)
if err != nil { if err != nil {
@ -413,8 +429,8 @@ func testTimelineFork(t *testing.T, syncRepl bool) {
} }
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
tks, tss := setupServers(t, dir, 3, 1, syncRepl) tks, tss, te := setupServers(t, dir, 3, 1, syncRepl)
defer shutdown(tks, tss) defer shutdown(tks, tss, te)
master, standbys, err := getRoles(t, tks) master, standbys, err := getRoles(t, tks)
if err != nil { if err != nil {

View File

@ -30,9 +30,23 @@ func TestInit(t *testing.T) {
t.Fatalf("unexpected err: %v", err) t.Fatalf("unexpected err: %v", err)
} }
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
te, err := NewTestEtcd(dir)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
if err := te.Start(); err != nil {
t.Fatalf("unexpected err: %v", err)
}
if err := te.WaitUp(10 * time.Second); err != nil {
t.Fatalf("error waiting on etcd up: %v", err)
}
etcdEndpoints := fmt.Sprintf("http://%s:%s", te.listenAddress, te.port)
defer te.Stop()
clusterName := uuid.NewV4().String() clusterName := uuid.NewV4().String()
ts, err := NewTestSentinel(dir, clusterName) ts, err := NewTestSentinel(dir, clusterName, etcdEndpoints)
if err != nil { if err != nil {
t.Fatalf("unexpected err: %v", err) t.Fatalf("unexpected err: %v", err)
} }
@ -40,7 +54,7 @@ func TestInit(t *testing.T) {
t.Fatalf("unexpected err: %v", err) t.Fatalf("unexpected err: %v", err)
} }
defer ts.Stop() defer ts.Stop()
tk, err := NewTestKeeper(dir, clusterName) tk, err := NewTestKeeper(dir, clusterName, etcdEndpoints)
if err != nil { if err != nil {
t.Fatalf("unexpected err: %v", err) t.Fatalf("unexpected err: %v", err)
} }
@ -64,12 +78,26 @@ func TestExclusiveLock(t *testing.T) {
t.Fatalf("unexpected err: %v", err) t.Fatalf("unexpected err: %v", err)
} }
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
te, err := NewTestEtcd(dir)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
if err := te.Start(); err != nil {
t.Fatalf("unexpected err: %v", err)
}
if err := te.WaitUp(10 * time.Second); err != nil {
t.Fatalf("error waiting on etcd up: %v", err)
}
etcdEndpoints := fmt.Sprintf("http://%s:%s", te.listenAddress, te.port)
defer te.Stop()
clusterName := uuid.NewV4().String() clusterName := uuid.NewV4().String()
u := uuid.NewV4() u := uuid.NewV4()
id := fmt.Sprintf("%x", u[:4]) id := fmt.Sprintf("%x", u[:4])
tk1, err := NewTestKeeperWithID(dir, id, clusterName) tk1, err := NewTestKeeperWithID(dir, id, clusterName, etcdEndpoints)
if err != nil { if err != nil {
t.Fatalf("unexpected err: %v", err) t.Fatalf("unexpected err: %v", err)
} }
@ -83,7 +111,7 @@ func TestExclusiveLock(t *testing.T) {
t.Fatalf("expecting tk1 up but it's down") t.Fatalf("expecting tk1 up but it's down")
} }
tk2, err := NewTestKeeperWithID(dir, id, clusterName) tk2, err := NewTestKeeperWithID(dir, id, clusterName, etcdEndpoints)
if err != nil { if err != nil {
t.Fatalf("unexpected err: %v", err) t.Fatalf("unexpected err: %v", err)
} }

View File

@ -26,11 +26,13 @@ import (
"os/exec" "os/exec"
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings"
"time" "time"
"github.com/sorintlab/stolon/common" "github.com/sorintlab/stolon/common"
"github.com/sorintlab/stolon/pkg/cluster" "github.com/sorintlab/stolon/pkg/cluster"
etcd "github.com/sorintlab/stolon/Godeps/_workspace/src/github.com/coreos/etcd/client"
_ "github.com/sorintlab/stolon/Godeps/_workspace/src/github.com/lib/pq" _ "github.com/sorintlab/stolon/Godeps/_workspace/src/github.com/lib/pq"
"github.com/sorintlab/stolon/Godeps/_workspace/src/github.com/satori/go.uuid" "github.com/sorintlab/stolon/Godeps/_workspace/src/github.com/satori/go.uuid"
"github.com/sorintlab/stolon/Godeps/_workspace/src/golang.org/x/net/context" "github.com/sorintlab/stolon/Godeps/_workspace/src/golang.org/x/net/context"
@ -50,7 +52,7 @@ type TestKeeper struct {
db *sql.DB db *sql.DB
} }
func NewTestKeeperWithID(dir string, id string, clusterName string) (*TestKeeper, error) { func NewTestKeeperWithID(dir string, id string, clusterName string, etcdEndpoints string) (*TestKeeper, error) {
args := []string{} args := []string{}
dataDir := filepath.Join(dir, fmt.Sprintf("st%s", id)) dataDir := filepath.Join(dir, fmt.Sprintf("st%s", id))
@ -77,6 +79,7 @@ func NewTestKeeperWithID(dir string, id string, clusterName string) (*TestKeeper
args = append(args, fmt.Sprintf("--port=%s", port)) args = append(args, fmt.Sprintf("--port=%s", port))
args = append(args, fmt.Sprintf("--pg-port=%s", pgPort)) args = append(args, fmt.Sprintf("--pg-port=%s", pgPort))
args = append(args, fmt.Sprintf("--data-dir=%s", dataDir)) args = append(args, fmt.Sprintf("--data-dir=%s", dataDir))
args = append(args, fmt.Sprintf("--etcd-endpoints=%s", etcdEndpoints))
args = append(args, "--debug") args = append(args, "--debug")
connString := fmt.Sprintf("postgres://%s:%s/postgres?sslmode=disable", pgListenAddress, pgPort) connString := fmt.Sprintf("postgres://%s:%s/postgres?sslmode=disable", pgListenAddress, pgPort)
@ -103,11 +106,11 @@ func NewTestKeeperWithID(dir string, id string, clusterName string) (*TestKeeper
return tk, nil return tk, nil
} }
func NewTestKeeper(dir string, clusterName string) (*TestKeeper, error) { func NewTestKeeper(dir string, clusterName string, etcdEndpoints string) (*TestKeeper, error) {
u := uuid.NewV4() u := uuid.NewV4()
id := fmt.Sprintf("%x", u[:4]) id := fmt.Sprintf("%x", u[:4])
return NewTestKeeperWithID(dir, id, clusterName) return NewTestKeeperWithID(dir, id, clusterName, etcdEndpoints)
} }
func (tk *TestKeeper) Start() error { func (tk *TestKeeper) Start() error {
@ -399,7 +402,7 @@ type TestSentinel struct {
port string port string
} }
func NewTestSentinel(dir string, clusterName string) (*TestSentinel, error) { func NewTestSentinel(dir string, clusterName string, etcdEndpoints string) (*TestSentinel, error) {
u := uuid.NewV4() u := uuid.NewV4()
id := fmt.Sprintf("%x", u[:4]) id := fmt.Sprintf("%x", u[:4])
@ -416,6 +419,7 @@ func NewTestSentinel(dir string, clusterName string) (*TestSentinel, error) {
args := []string{} args := []string{}
args = append(args, fmt.Sprintf("--cluster-name=%s", clusterName)) args = append(args, fmt.Sprintf("--cluster-name=%s", clusterName))
args = append(args, fmt.Sprintf("--port=%s", port)) args = append(args, fmt.Sprintf("--port=%s", port))
args = append(args, fmt.Sprintf("--etcd-endpoints=%s", etcdEndpoints))
args = append(args, "--debug") args = append(args, "--debug")
sentinelBin := os.Getenv("STSENTINEL_BIN") sentinelBin := os.Getenv("STSENTINEL_BIN")
@ -492,3 +496,159 @@ func (ts *TestSentinel) Stop() {
ts.cmd.Wait() ts.cmd.Wait()
ts.cmd = nil ts.cmd = nil
} }
type TestEtcd struct {
cmd *exec.Cmd
etcdBin string
args []string
listenAddress string
port string
eClient etcd.Client
kAPI etcd.KeysAPI
}
func NewTestEtcd(dir string, a ...string) (*TestEtcd, error) {
dataDir := filepath.Join(dir, "etcd")
// Hack to find a free tcp port
ln, err := net.Listen("tcp", "localhost:0")
if err != nil {
return nil, err
}
defer ln.Close()
ln2, err := net.Listen("tcp", "localhost:0")
if err != nil {
return nil, err
}
defer ln2.Close()
listenAddress := ln.Addr().(*net.TCPAddr).IP.String()
port := strconv.Itoa(ln.Addr().(*net.TCPAddr).Port)
listenAddress2 := ln2.Addr().(*net.TCPAddr).IP.String()
port2 := strconv.Itoa(ln2.Addr().(*net.TCPAddr).Port)
args := []string{}
args = append(args, fmt.Sprintf("--data-dir=%s", dataDir))
args = append(args, fmt.Sprintf("--listen-client-urls=http://%s:%s", listenAddress, port))
args = append(args, fmt.Sprintf("--advertise-client-urls=http://%s:%s", listenAddress, port))
args = append(args, fmt.Sprintf("--listen-peer-urls=http://%s:%s", listenAddress2, port2))
args = append(args, a...)
etcdEndpoints := fmt.Sprintf("http://%s:%s", listenAddress, port)
eCfg := etcd.Config{
Transport: &http.Transport{},
Endpoints: strings.Split(etcdEndpoints, ","),
}
eClient, err := etcd.New(eCfg)
if err != nil {
return nil, err
}
kAPI := etcd.NewKeysAPI(eClient)
etcdBin := os.Getenv("ETCD_BIN")
if etcdBin == "" {
return nil, fmt.Errorf("missing ETCD_BIN env")
}
te := &TestEtcd{
etcdBin: etcdBin,
args: args,
listenAddress: listenAddress,
port: port,
eClient: eClient,
kAPI: kAPI,
}
return te, nil
}
func (te *TestEtcd) Start() error {
if te.cmd != nil {
panic("te: etcd not cleanly stopped")
}
te.cmd = exec.Command(te.etcdBin, te.args...)
err := te.cmd.Start()
if err != nil {
return err
}
return nil
}
func (te *TestEtcd) Signal(sig os.Signal) error {
fmt.Printf("signalling etcd with %s\n", sig)
if te.cmd == nil {
panic("te: cmd is empty")
}
return te.cmd.Process.Signal(sig)
}
func (te *TestEtcd) Kill() {
fmt.Printf("killing etcd")
if te.cmd == nil {
panic("te: cmd is empty")
}
te.cmd.Process.Signal(os.Kill)
te.cmd.Wait()
te.cmd = nil
}
func (te *TestEtcd) Stop() {
fmt.Printf("stopping etcd\n")
if te.cmd == nil {
panic("te: cmd is empty")
}
te.cmd.Process.Signal(os.Interrupt)
te.cmd.Wait()
te.cmd = nil
}
func (te *TestEtcd) WaitProcess(timeout time.Duration) error {
timeoutCh := time.NewTimer(timeout).C
endCh := make(chan error)
go func() {
err := te.cmd.Wait()
endCh <- err
}()
select {
case <-timeoutCh:
return fmt.Errorf("timeout waiting on process")
case <-endCh:
return nil
}
}
func (te *TestEtcd) WaitUp(timeout time.Duration) error {
start := time.Now()
for time.Now().Add(-timeout).Before(start) {
_, err := te.GetEtcdNode(timeout-time.Now().Sub(start), "/")
if err == nil {
return nil
}
time.Sleep(1 * time.Second)
}
return fmt.Errorf("timeout")
}
func (te *TestEtcd) WaitDown(timeout time.Duration) error {
start := time.Now()
for time.Now().Add(-timeout).Before(start) {
_, err := te.GetEtcdNode(timeout-time.Now().Sub(start), "/")
if err != nil {
return nil
}
time.Sleep(2 * time.Second)
}
return fmt.Errorf("timeout")
}
func (te *TestEtcd) GetEtcdNode(timeout time.Duration, path string) (*etcd.Node, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
res, err := te.kAPI.Get(ctx, path, &etcd.GetOptions{Quorum: true})
if err != nil {
return nil, err
}
return res.Node, nil
}