*: Handle synchronous replication

Now users can define in the cluster config if they want synchronous
replication. When enabled the master server parameter `synchronous_standby_names`
will be set to the value of the followers defined in the cluster view.

Users can switch between async and sync replication at any time without the
need to restart anything.
This commit is contained in:
Simone Gotti 2015-10-16 11:30:11 +02:00
parent dad1c0d9c4
commit 0161b63977
14 changed files with 174 additions and 66 deletions

View File

@ -1,4 +1,4 @@
# stolon - PostgreSQL cloud native manager
# stolon - PostgreSQL cloud native HA replication manager
[![Build Status](https://semaphoreci.com/api/v1/projects/fb01aecd-c3d5-407b-a157-7d5365e9e4b6/565617/badge.svg)](https://semaphoreci.com/sorintlab/stolon)
@ -9,6 +9,7 @@ stolon is a cloud native PostgreSQL manager for PostgreSQL high availability. It
* leverages PostgreSQL streaming replication
* works inside kubernetes letting you handle persistent high availability
* uses [etcd](https://github.com/coreos/etcd) as an high available data store and for leader election
* asynchronous (default) and synchronous replication.
## Architecture

View File

@ -23,6 +23,7 @@ import (
"os/signal"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
@ -71,7 +72,7 @@ var cfg config
func init() {
cmdKeeper.PersistentFlags().StringVar(&cfg.id, "id", "", "keeper id (must be unique in the cluster)")
cmdKeeper.PersistentFlags().StringVar(&cfg.etcdEndpoints, "etcd-endpoints", "http://127.0.0.1:4001,http://127.0.0.1:2379", "a comma-delimited list of etcd endpoints")
cmdKeeper.PersistentFlags().StringVar(&cfg.etcdEndpoints, "etcd-endpoints", common.DefaultEtcdEndpoints, "a comma-delimited list of etcd endpoints")
cmdKeeper.PersistentFlags().StringVar(&cfg.dataDir, "data-dir", "", "data directory")
cmdKeeper.PersistentFlags().StringVar(&cfg.clusterName, "cluster-name", "", "cluster name")
cmdKeeper.PersistentFlags().StringVar(&cfg.listenAddress, "listen-address", "localhost", "keeper listening address")
@ -96,7 +97,7 @@ var defaultServerParameters = pg.ServerParameters{
}
func (p *PostgresKeeper) getReplConnString(memberState *cluster.MemberState) string {
return fmt.Sprintf("postgres://%s:%s@%s:%s?sslmode=disable", p.clusterConfig.PGReplUser, p.clusterConfig.PGReplPassword, memberState.PGListenAddress, memberState.PGPort)
return fmt.Sprintf("postgres://%s:%s@%s:%s?sslmode=disable&application_name=%s", p.clusterConfig.PGReplUser, p.clusterConfig.PGReplPassword, memberState.PGListenAddress, memberState.PGPort, p.id)
}
func (p *PostgresKeeper) getOurConnString() string {
@ -512,6 +513,23 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
}
}
}
// Setup synchronous replication
syncStandbyNames, _ := pgm.GetServerParameter("synchronous_standby_names")
if p.clusterConfig.SynchronousReplication {
newSyncStandbyNames := strings.Join(followersIDs, ",")
if syncStandbyNames != newSyncStandbyNames {
log.Debugf("needed synchronous_standby_names changed from %q to %q, reconfiguring", syncStandbyNames, newSyncStandbyNames)
pgm.SetServerParameter("synchronous_standby_names", newSyncStandbyNames)
pgm.Reload()
}
} else {
if syncStandbyNames != "" {
log.Debugf("sync replication disabled, removing current synchronous_standby_names %q", syncStandbyNames)
pgm.SetServerParameter("synchronous_standby_names", "")
pgm.Reload()
}
}
}
} else {
log.Infof("our cluster requested state is standby following %q", memberRole.Follow)

View File

@ -55,7 +55,7 @@ type config struct {
var cfg config
func init() {
cmdProxy.PersistentFlags().StringVar(&cfg.etcdEndpoints, "etcd-endpoints", "http://127.0.0.1:4001,http://127.0.0.1:2379", "a comma-delimited list of etcd endpoints")
cmdProxy.PersistentFlags().StringVar(&cfg.etcdEndpoints, "etcd-endpoints", common.DefaultEtcdEndpoints, "a comma-delimited list of etcd endpoints")
cmdProxy.PersistentFlags().StringVar(&cfg.clusterName, "cluster-name", "", "cluster name")
cmdProxy.PersistentFlags().StringVar(&cfg.listenAddress, "listen-address", "127.0.0.1", "proxy listening address")
cmdProxy.PersistentFlags().StringVar(&cfg.port, "port", "5432", "proxy listening port")

View File

@ -60,7 +60,7 @@ type config struct {
var cfg config
func init() {
cmdSentinel.PersistentFlags().StringVar(&cfg.etcdEndpoints, "etcd-endpoints", "http://127.0.0.1:4001,http://127.0.0.1:2379", "a comma-delimited list of etcd endpoints")
cmdSentinel.PersistentFlags().StringVar(&cfg.etcdEndpoints, "etcd-endpoints", common.DefaultEtcdEndpoints, "a comma-delimited list of etcd endpoints")
cmdSentinel.PersistentFlags().StringVar(&cfg.clusterName, "cluster-name", "", "cluster name")
cmdSentinel.PersistentFlags().StringVar(&cfg.keeperKubeLabelSelector, "keeper-kube-label-selector", "", "label selector for discoverying stolon-keeper(s) under kubernetes")
cmdSentinel.PersistentFlags().StringVar(&cfg.keeperPort, "keeper-port", "5431", "stolon-keeper(s) listening port (used by kubernetes discovery)")

View File

@ -23,6 +23,8 @@ const (
SentinelLeaseName = "sentinel-leader"
DefaultEtcdRequestTimeout = 5 * time.Second
DefaultEtcdEndpoints = "http://127.0.0.1:4001,http://127.0.0.1:2379"
)
type Role uint8

19
doc/syncrepl.md Normal file
View File

@ -0,0 +1,19 @@
# Synchronous replication
**Note:** this is temporary. In future you can update the cluster config using a stolon client (`stolonctl`) instead of manually writing the full cluster config inside etcd since this is quite error prone.
You can enable/disable synchronous replication at any time and the keepers will reconfigure themselves.
To do this, you should write the cluster config to the etcd path: `/stolon/cluster/$CLUSTERNAME/config` using for example `curl` or `etcdctl`.
## Enable synchronous replication.
Assuming that you cluster name is `mycluster` and etcd is listening on localhost:2379:
```
curl http://127.0.0.1:2379/v2/keys/stolon/cluster/mycluster/config -XPUT -d value={ "synchronousreplication" : true }'
```
or with etcdctl
```
etcdctl set /stolon/cluster/stolon-cluster/config '{ "synchronousreplication" : true }'
```

View File

@ -16,6 +16,7 @@ package cluster
import (
"reflect"
"sort"
"time"
)
@ -129,6 +130,7 @@ func (cv *ClusterView) Copy() *ClusterView {
return &ncv
}
// Returns a sorted list of followersIDs
func (cv *ClusterView) GetFollowersIDs(id string) []string {
followersIDs := []string{}
for memberID, mr := range cv.MembersRole {
@ -136,6 +138,7 @@ func (cv *ClusterView) GetFollowersIDs(id string) []string {
followersIDs = append(followersIDs, memberID)
}
}
sort.Strings(followersIDs)
return followersIDs
}

View File

@ -27,23 +27,25 @@ const (
var (
DefaultConfig = Config{
RequestTimeout: 10 * time.Second,
SleepInterval: 5 * time.Second,
MemberFailInterval: 20 * time.Second,
PGReplUser: "repluser",
PGReplPassword: "replpassword",
MaxStandbysPerSender: 3,
RequestTimeout: 10 * time.Second,
SleepInterval: 5 * time.Second,
MemberFailInterval: 20 * time.Second,
PGReplUser: "repluser",
PGReplPassword: "replpassword",
MaxStandbysPerSender: 3,
SynchronousReplication: false,
}
)
// jsonConfig is a copy of Config with all the time.Duration types converted to duration.
type jsonConfig struct {
RequestTimeout duration `json:",omitempty"`
SleepInterval duration `json:",omitempty"`
MemberFailInterval duration `json:",omitempty"`
PGReplUser string `json:",omitempty"`
PGReplPassword string `json:",omitempty"`
MaxStandbysPerSender uint `json:",omitempty"`
RequestTimeout duration `json:",omitempty"`
SleepInterval duration `json:",omitempty"`
MemberFailInterval duration `json:",omitempty"`
PGReplUser string `json:",omitempty"`
PGReplPassword string `json:",omitempty"`
MaxStandbysPerSender uint `json:",omitempty"`
SynchronousReplication bool `json:",omitempty"`
}
type Config struct {
@ -54,33 +56,41 @@ type Config struct {
// Interval after the first fail to declare a member as not healthy.
MemberFailInterval time.Duration
// PostgreSQL replication username
PGReplUser string `json:",omitempty"`
PGReplUser string
// PostgreSQL replication password
PGReplPassword string `json:",omitempty"`
PGReplPassword string
// Max number of standbys for every sender. A sender can be a master or
// another standby (with cascading replication).
MaxStandbysPerSender uint `json:",omitempty"`
MaxStandbysPerSender uint
// Use Synchronous replication between master and its standbys
SynchronousReplication bool
}
func (c *Config) MarshalJSON() ([]byte, error) {
return json.Marshal(configToJsonConfig(c))
}
func configToJsonConfig(c *Config) *jsonConfig {
return &jsonConfig{
RequestTimeout: duration(c.RequestTimeout),
SleepInterval: duration(c.SleepInterval),
MemberFailInterval: duration(c.MemberFailInterval),
PGReplUser: c.PGReplUser,
PGReplPassword: c.PGReplPassword,
MaxStandbysPerSender: c.MaxStandbysPerSender,
RequestTimeout: duration(c.RequestTimeout),
SleepInterval: duration(c.SleepInterval),
MemberFailInterval: duration(c.MemberFailInterval),
PGReplUser: c.PGReplUser,
PGReplPassword: c.PGReplPassword,
MaxStandbysPerSender: c.MaxStandbysPerSender,
SynchronousReplication: c.SynchronousReplication,
}
}
func jsonConfigToConfig(c *jsonConfig) *Config {
return &Config{
RequestTimeout: time.Duration(c.RequestTimeout),
SleepInterval: time.Duration(c.SleepInterval),
MemberFailInterval: time.Duration(c.MemberFailInterval),
PGReplUser: c.PGReplUser,
PGReplPassword: c.PGReplPassword,
MaxStandbysPerSender: c.MaxStandbysPerSender,
RequestTimeout: time.Duration(c.RequestTimeout),
SleepInterval: time.Duration(c.SleepInterval),
MemberFailInterval: time.Duration(c.MemberFailInterval),
PGReplUser: c.PGReplUser,
PGReplPassword: c.PGReplPassword,
MaxStandbysPerSender: c.MaxStandbysPerSender,
SynchronousReplication: c.SynchronousReplication,
}
}

View File

@ -82,14 +82,15 @@ func TestParseConfig(t *testing.T) {
},
// All options defined
{
in: `{ "requestTimeout": "10s", "sleepInterval": "10s", "memberFailInterval": "100s", "pgrepluser": "username", "pgreplpassword": "password", "maxstandbyspersender": 5 }`,
in: `{ "requestTimeout": "10s", "sleepInterval": "10s", "memberFailInterval": "100s", "pgrepluser": "username", "pgreplpassword": "password", "maxstandbyspersender": 5, "synchronousreplication": true}`,
cfg: mergeDefaultConfig(&Config{
RequestTimeout: 10 * time.Second,
SleepInterval: 10 * time.Second,
MemberFailInterval: 100 * time.Second,
PGReplUser: "username",
PGReplPassword: "password",
MaxStandbysPerSender: 5,
RequestTimeout: 10 * time.Second,
SleepInterval: 10 * time.Second,
MemberFailInterval: 100 * time.Second,
PGReplUser: "username",
PGReplPassword: "password",
MaxStandbysPerSender: 5,
SynchronousReplication: true,
}),
err: nil,
},
@ -135,5 +136,8 @@ func mergeDefaultConfig(ic *Config) *Config {
if ic.MaxStandbysPerSender != 0 {
c.MaxStandbysPerSender = ic.MaxStandbysPerSender
}
if ic.SynchronousReplication != false {
c.SynchronousReplication = ic.SynchronousReplication
}
return &c
}

View File

@ -70,7 +70,7 @@ func (e *EtcdManager) NewLeaseManager() lease.Manager {
return lease.NewEtcdLeaseManager(e.kAPI, e.etcdPath, e.requestTimeout)
}
func (e *EtcdManager) SetClusterConfig(cfg cluster.Config) (*etcd.Response, error) {
func (e *EtcdManager) SetClusterConfig(cfg *cluster.Config) (*etcd.Response, error) {
cfgj, err := json.Marshal(cfg)
if err != nil {
return nil, err

View File

@ -368,8 +368,7 @@ func (p *Manager) GetPrimaryConninfo() (connParams, error) {
for scanner.Scan() {
m := regex.FindStringSubmatch(scanner.Text())
if len(m) == 2 {
connString := m[1]
return ParseConnString(connString)
return ParseConnString(m[1])
}
}
return nil, nil
@ -483,11 +482,15 @@ func (p *Manager) SyncFromMaster(masterconnString string) error {
defer os.Remove(pgpass.Name())
defer pgpass.Close()
pgpass.WriteString(fmt.Sprintf("%s:%s:*:%s:%s\n", cp.Get("host"), cp.Get("port"), cp.Get("user"), cp.Get("password")))
host := cp.Get("host")
port := cp.Get("port")
user := cp.Get("user")
password := cp.Get("password")
pgpass.WriteString(fmt.Sprintf("%s:%s:*:%s:%s\n", host, port, user, password))
log.Infof("Running pg_basebackup\n")
name := filepath.Join(p.pgBinPath, "pg_basebackup")
cmd := exec.Command(name, "-R", "-D", p.dataDir, "--host="+cp.Get("host"), "--port="+cp.Get("port"), "-U", cp.Get("user"))
cmd := exec.Command(name, "-R", "-D", p.dataDir, "--host="+host, "--port="+port, "-U", user)
cmd.Env = append(cmd.Env, fmt.Sprintf("PGPASSFILE=%s", pgpass.Name()))
log.Debugf("execing cmd: %s", cmd)
out, err := cmd.CombinedOutput()

2
test
View File

@ -92,7 +92,7 @@ if [ -n "$INTEGRATION" ]; then
export STKEEPER_BIN=${BINDIR}/stolon-keeper
export STSENTINEL_BIN=${BINDIR}/stolon-sentinel
export STPROXY_BIN=${BINDIR}/stolon-proxy
go test $@ -v ${REPO_PATH}/tests/integration
go test -timeout 20m $@ -v ${REPO_PATH}/tests/integration
fi
echo "Success"

View File

@ -18,21 +18,31 @@ import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"syscall"
"testing"
"time"
"github.com/sorintlab/stolon/Godeps/_workspace/src/github.com/satori/go.uuid"
"github.com/sorintlab/stolon/common"
"github.com/sorintlab/stolon/pkg/cluster"
etcdm "github.com/sorintlab/stolon/pkg/etcd"
)
func setupServers(t *testing.T, dir string, numKeepers, numSentinels uint8) ([]*TestKeeper, []*TestSentinel) {
cluster := uuid.NewV4().String()
func setupServers(t *testing.T, dir string, numKeepers, numSentinels uint8, syncRepl bool) ([]*TestKeeper, []*TestSentinel) {
clusterName := uuid.NewV4().String()
etcdPath := filepath.Join(common.EtcdBasePath, clusterName)
e, err := etcdm.NewEtcdManager(common.DefaultEtcdEndpoints, etcdPath, common.DefaultEtcdRequestTimeout)
if err != nil {
t.Fatalf("cannot create etcd manager: %v", err)
}
e.SetClusterConfig(&cluster.Config{SynchronousReplication: syncRepl})
tms := []*TestKeeper{}
tss := []*TestSentinel{}
tm, err := NewTestKeeper(dir, cluster)
tm, err := NewTestKeeper(dir, clusterName)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
@ -42,7 +52,7 @@ func setupServers(t *testing.T, dir string, numKeepers, numSentinels uint8) ([]*
// Start sentinels
for i := uint8(0); i < numSentinels; i++ {
ts, err := NewTestSentinel(dir, cluster)
ts, err := NewTestSentinel(dir, clusterName)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
@ -63,7 +73,7 @@ func setupServers(t *testing.T, dir string, numKeepers, numSentinels uint8) ([]*
// Start standbys
for i := uint8(1); i < numKeepers; i++ {
tm, err := NewTestKeeper(dir, cluster)
tm, err := NewTestKeeper(dir, clusterName)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
@ -129,8 +139,7 @@ func shutdown(tms []*TestKeeper, tss []*TestSentinel) {
}
for _, tm := range tms {
if tm.cmd != nil {
tm.SignalPG(os.Kill)
tm.Signal(os.Kill)
tm.Stop()
}
}
}
@ -150,14 +159,14 @@ func getRoles(t *testing.T, tms []*TestKeeper) (master *TestKeeper, standbys []*
return
}
func TestMasterStandby(t *testing.T) {
func testMasterStandby(t *testing.T, syncRepl bool) {
dir, err := ioutil.TempDir("", "stolon")
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer os.RemoveAll(dir)
tms, tss := setupServers(t, dir, 2, 1)
tms, tss := setupServers(t, dir, 2, 1, syncRepl)
defer shutdown(tms, tss)
master, standbys, err := getRoles(t, tms)
@ -190,14 +199,22 @@ func TestMasterStandby(t *testing.T) {
}
func TestFailover(t *testing.T) {
func TestMasterStandbySyncRepl(t *testing.T) {
testMasterStandby(t, false)
}
func TestMasterStandby(t *testing.T) {
testMasterStandby(t, true)
}
func testFailover(t *testing.T, syncRepl bool) {
dir, err := ioutil.TempDir("", "stolon")
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer os.RemoveAll(dir)
tms, tss := setupServers(t, dir, 2, 1)
tms, tss := setupServers(t, dir, 2, 1, syncRepl)
defer shutdown(tms, tss)
master, standbys, err := getRoles(t, tms)
@ -229,14 +246,21 @@ func TestFailover(t *testing.T) {
}
}
func TestOldMasterRestart(t *testing.T) {
func TestFailover(t *testing.T) {
testFailover(t, false)
}
func TestFailoverSyncRepl(t *testing.T) {
testFailover(t, true)
}
func testOldMasterRestart(t *testing.T, syncRepl bool) {
dir, err := ioutil.TempDir("", "stolon")
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer os.RemoveAll(dir)
tms, tss := setupServers(t, dir, 2, 1)
tms, tss := setupServers(t, dir, 2, 1, syncRepl)
defer shutdown(tms, tss)
master, standbys, err := getRoles(t, tms)
@ -288,14 +312,22 @@ func TestOldMasterRestart(t *testing.T) {
}
}
func TestPartition1(t *testing.T) {
func TestOldMasterRestart(t *testing.T) {
testOldMasterRestart(t, false)
}
func TestOldMasterRestartSyncRepl(t *testing.T) {
testOldMasterRestart(t, true)
}
func testPartition1(t *testing.T, syncRepl bool) {
dir, err := ioutil.TempDir("", "stolon")
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer os.RemoveAll(dir)
tms, tss := setupServers(t, dir, 2, 1)
tms, tss := setupServers(t, dir, 2, 1, syncRepl)
defer shutdown(tms, tss)
master, standbys, err := getRoles(t, tms)
@ -356,14 +388,22 @@ func TestPartition1(t *testing.T) {
}
}
func TestTimelineFork(t *testing.T) {
func TestPartition1(t *testing.T) {
testPartition1(t, false)
}
func TestPartition1SyncRepl(t *testing.T) {
testPartition1(t, true)
}
func testTimelineFork(t *testing.T, syncRepl bool) {
dir, err := ioutil.TempDir("", "stolon")
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer os.RemoveAll(dir)
tms, tss := setupServers(t, dir, 3, 1)
tms, tss := setupServers(t, dir, 3, 1, syncRepl)
defer shutdown(tms, tss)
master, standbys, err := getRoles(t, tms)
@ -442,3 +482,11 @@ func TestTimelineFork(t *testing.T) {
t.Fatalf("unexpected err: %v", err)
}
}
func TestTimelineFork(t *testing.T) {
testTimelineFork(t, false)
}
func TestTimelineForkSyncRepl(t *testing.T) {
testTimelineFork(t, true)
}

View File

@ -41,7 +41,7 @@ type TestKeeper struct {
db *sql.DB
}
func NewTestKeeper(dir string, cluster string) (*TestKeeper, error) {
func NewTestKeeper(dir string, clusterName string) (*TestKeeper, error) {
configFile, err := ioutil.TempFile(dir, "conf")
if err != nil {
return nil, err
@ -70,7 +70,7 @@ func NewTestKeeper(dir string, cluster string) (*TestKeeper, error) {
port2 := ln2.Addr().(*net.TCPAddr).Port
args = append(args, fmt.Sprintf("--id=%s", id))
args = append(args, fmt.Sprintf("--cluster-name=%s", cluster))
args = append(args, fmt.Sprintf("--cluster-name=%s", clusterName))
args = append(args, fmt.Sprintf("--port=%d", port))
args = append(args, fmt.Sprintf("--pg-port=%d", port2))
args = append(args, fmt.Sprintf("--data-dir=%s", dataDir))
@ -298,12 +298,12 @@ type TestSentinel struct {
args []string
}
func NewTestSentinel(dir string, cluster string) (*TestSentinel, error) {
func NewTestSentinel(dir string, clusterName string) (*TestSentinel, error) {
u := uuid.NewV4()
id := fmt.Sprintf("%x", u[:4])
args := []string{}
args = append(args, fmt.Sprintf("--cluster-name=%s", cluster))
args = append(args, fmt.Sprintf("--cluster-name=%s", clusterName))
args = append(args, "--debug")
sentinelBin := os.Getenv("STSENTINEL_BIN")
if sentinelBin == "" {