mirror of
https://github.com/sorintlab/stolon.git
synced 2024-10-04 11:37:19 +03:00
*: syncrepl, consider only really in sync standbys
When using syncrepl don't consider a standby in sync only because it has been added in the synchronous_standby_names parameter but only when the postgres instance reports it as in sync (querying pg_stat_replication).
This commit is contained in:
parent
e81e5d71b5
commit
bb89f80e4b
@ -611,6 +611,22 @@ func parseSynchronousStandbyNames(s string) ([]string, error) {
|
||||
return entries, nil
|
||||
}
|
||||
|
||||
func (p *PostgresKeeper) GetInSyncStandbys() ([]string, error) {
|
||||
inSyncStandbysFullName, err := p.pgm.GetSyncStandbys()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to retrieve current sync standbys status from instance: %v", err)
|
||||
}
|
||||
|
||||
inSyncStandbys := []string{}
|
||||
for _, s := range inSyncStandbysFullName {
|
||||
if common.IsStolonName(s) {
|
||||
inSyncStandbys = append(inSyncStandbys, common.NameFromStolonName(s))
|
||||
}
|
||||
}
|
||||
|
||||
return inSyncStandbys, nil
|
||||
}
|
||||
|
||||
func (p *PostgresKeeper) GetPGState(pctx context.Context) (*cluster.PostgresState, error) {
|
||||
p.getPGStateMutex.Lock()
|
||||
defer p.getPGStateMutex.Unlock()
|
||||
@ -644,20 +660,13 @@ func (p *PostgresKeeper) GetPGState(pctx context.Context) (*cluster.PostgresStat
|
||||
log.Debugw("filtered out managed pg parameters", "filteredPGParameters", filteredPGParameters)
|
||||
pgState.PGParameters = filteredPGParameters
|
||||
|
||||
synchronousStandbyNames, err := parseSynchronousStandbyNames(pgParameters["synchronous_standby_names"])
|
||||
inSyncStandbys, err := p.GetInSyncStandbys()
|
||||
if err != nil {
|
||||
log.Errorw("error parsing synchronous_standby_names", zap.Error(err))
|
||||
log.Errorw("failed to retrieve current in sync standbys from instance", zap.Error(err))
|
||||
return pgState, nil
|
||||
}
|
||||
synchronousStandbys := []string{}
|
||||
for _, n := range synchronousStandbyNames {
|
||||
// pgState.SynchronousStandbys must contain only the internal standbys dbUIDs
|
||||
if !common.IsStolonName(n) {
|
||||
continue
|
||||
}
|
||||
synchronousStandbys = append(synchronousStandbys, common.NameFromStolonName(n))
|
||||
}
|
||||
pgState.SynchronousStandbys = synchronousStandbys
|
||||
|
||||
pgState.SynchronousStandbys = inSyncStandbys
|
||||
|
||||
sd, err := p.pgm.GetSystemData()
|
||||
if err != nil {
|
||||
|
@ -306,7 +306,7 @@ func (s *Sentinel) updateKeepersStatus(cd *cluster.ClusterData, keepersInfo clus
|
||||
db.Status.TimelinesHistory = dbs.TimelinesHistory
|
||||
db.Status.PGParameters = cluster.PGParameters(dbs.PGParameters)
|
||||
|
||||
db.Status.SynchronousStandbys = dbs.SynchronousStandbys
|
||||
db.Status.CurSynchronousStandbys = dbs.SynchronousStandbys
|
||||
|
||||
db.Status.OlderWalFile = dbs.OlderWalFile
|
||||
} else {
|
||||
@ -1202,13 +1202,50 @@ func (s *Sentinel) updateCluster(cd *cluster.ClusterData, pis cluster.ProxiesInf
|
||||
}
|
||||
}
|
||||
|
||||
// if the current known in sync syncstandbys are different than the required ones wait for them and remove non good ones
|
||||
if !util.CompareStringSliceNoOrder(masterDB.Status.SynchronousStandbys, masterDB.Spec.SynchronousStandbys) {
|
||||
|
||||
// remove old syncstandbys from current status
|
||||
masterDB.Status.SynchronousStandbys = util.CommonElements(masterDB.Status.SynchronousStandbys, masterDB.Spec.SynchronousStandbys)
|
||||
|
||||
// add reported in sync syncstandbys to the current status
|
||||
curSyncStandbys := util.CommonElements(masterDB.Status.CurSynchronousStandbys, masterDB.Spec.SynchronousStandbys)
|
||||
toAddSyncStandbys := util.Difference(curSyncStandbys, masterDB.Status.SynchronousStandbys)
|
||||
masterDB.Status.SynchronousStandbys = append(masterDB.Status.SynchronousStandbys, toAddSyncStandbys...)
|
||||
|
||||
// if some of the non yet in sync syncstandbys are failed, set Spec.SynchronousStandbys to the current in sync ones, se other could be added.
|
||||
notInSyncSyncStandbys := util.Difference(masterDB.Spec.SynchronousStandbys, masterDB.Status.SynchronousStandbys)
|
||||
update := false
|
||||
for _, dbUID := range notInSyncSyncStandbys {
|
||||
if _, ok := newcd.DBs[dbUID]; !ok {
|
||||
log.Infow("one of the new synchronousStandbys has been removed", "db", dbUID, "inSyncStandbys", masterDB.Status.SynchronousStandbys, "synchronousStandbys", masterDB.Spec.SynchronousStandbys)
|
||||
update = true
|
||||
continue
|
||||
}
|
||||
if _, ok := goodStandbys[dbUID]; !ok {
|
||||
log.Infow("one of the new synchronousStandbys is not in good state", "db", dbUID, "inSyncStandbys", masterDB.Status.SynchronousStandbys, "synchronousStandbys", masterDB.Spec.SynchronousStandbys)
|
||||
update = true
|
||||
continue
|
||||
}
|
||||
}
|
||||
if update {
|
||||
// Use the current known in sync syncStandbys as Spec.SynchronousStandbys
|
||||
log.Infow("setting the expected sync-standbys to the current known in sync sync-standbys", "inSyncStandbys", masterDB.Status.SynchronousStandbys, "synchronousStandbys", masterDB.Spec.SynchronousStandbys)
|
||||
masterDB.Spec.SynchronousStandbys = masterDB.Status.SynchronousStandbys
|
||||
|
||||
// Just sort to always have them in the same order and avoid
|
||||
// unneeded updates to synchronous_standby_names by the keeper.
|
||||
sort.Sort(sort.StringSlice(masterDB.Spec.SynchronousStandbys))
|
||||
}
|
||||
}
|
||||
|
||||
// update synchronousStandbys only if the reported
|
||||
// SynchronousStandbys are the same as the required ones. In
|
||||
// this way, when we have to choose a new master we are sure
|
||||
// that there're no intermediate changes between the
|
||||
// reported standbys and the required ones.
|
||||
if !util.CompareStringSliceNoOrder(masterDB.Status.SynchronousStandbys, masterDB.Spec.SynchronousStandbys) {
|
||||
log.Infof("won't update masterDB required synchronous standby since the latest master reported synchronous standbys are different from the db spec ones", "reported", curMasterDB.Status.SynchronousStandbys, "spec", curMasterDB.Spec.SynchronousStandbys)
|
||||
log.Infow("waiting for new defined synchronous standbys to be in sync", "inSyncStandbys", curMasterDB.Status.SynchronousStandbys, "synchronousStandbys", curMasterDB.Spec.SynchronousStandbys)
|
||||
} else {
|
||||
addFakeStandby := false
|
||||
externalSynchronousStandbys := map[string]struct{}{}
|
||||
@ -1358,6 +1395,9 @@ func (s *Sentinel) updateCluster(cd *cluster.ClusterData, pis cluster.ProxiesInf
|
||||
masterDB.Spec.ExternalSynchronousStandbys = append(masterDB.Spec.ExternalSynchronousStandbys, fakeStandbyName)
|
||||
}
|
||||
|
||||
// remove old syncstandbys from current status
|
||||
masterDB.Status.SynchronousStandbys = util.CommonElements(masterDB.Status.SynchronousStandbys, masterDB.Spec.SynchronousStandbys)
|
||||
|
||||
// Just sort to always have them in the same order and avoid
|
||||
// unneeded updates to synchronous_standby_names by the keeper.
|
||||
sort.Sort(sort.StringSlice(masterDB.Spec.SynchronousStandbys))
|
||||
@ -1367,6 +1407,8 @@ func (s *Sentinel) updateCluster(cd *cluster.ClusterData, pis cluster.ProxiesInf
|
||||
masterDB.Spec.SynchronousReplication = false
|
||||
masterDB.Spec.SynchronousStandbys = nil
|
||||
masterDB.Spec.ExternalSynchronousStandbys = nil
|
||||
|
||||
masterDB.Status.SynchronousStandbys = nil
|
||||
}
|
||||
|
||||
// NotFailed != Good since there can be some dbs that are converging
|
||||
|
@ -2403,8 +2403,9 @@ func TestUpdateCluster(t *testing.T) {
|
||||
ExternalSynchronousStandbys: []string{},
|
||||
},
|
||||
Status: cluster.DBStatus{
|
||||
Healthy: true,
|
||||
CurrentGeneration: 1,
|
||||
Healthy: true,
|
||||
CurrentGeneration: 1,
|
||||
SynchronousStandbys: []string{},
|
||||
},
|
||||
},
|
||||
"db2": &cluster.DB{
|
||||
@ -3174,8 +3175,8 @@ func TestUpdateCluster(t *testing.T) {
|
||||
// #19 One master and two standbys. Synchronous replication already
|
||||
// enabled with MinSynchronousStandbys and MaxSynchronousStandbys to 1
|
||||
// (default).
|
||||
// sync standby db not healthy: the other standby choosed as new sync
|
||||
// standby. Both will appear as SynchronousStandbys
|
||||
// sync standby db2 not healthy: the other standby db3 choosed as new
|
||||
// sync standby. Both will appear as SynchronousStandbys
|
||||
{
|
||||
cd: &cluster.ClusterData{
|
||||
Cluster: &cluster.Cluster{
|
||||
@ -3684,7 +3685,521 @@ func TestUpdateCluster(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
// #21 From #19. master have reported the new sync standbys (db2, db3).
|
||||
// #21 From #19. master have not yet reported the new sync standbys as in sync (db3).
|
||||
// db2 will remain the unique real in sync db in db1.Status.SynchronousStandbys
|
||||
{
|
||||
cd: &cluster.ClusterData{
|
||||
Cluster: &cluster.Cluster{
|
||||
UID: "cluster1",
|
||||
Generation: 1,
|
||||
Spec: &cluster.ClusterSpec{
|
||||
ConvergenceTimeout: &cluster.Duration{Duration: cluster.DefaultConvergenceTimeout},
|
||||
InitTimeout: &cluster.Duration{Duration: cluster.DefaultInitTimeout},
|
||||
SyncTimeout: &cluster.Duration{Duration: cluster.DefaultSyncTimeout},
|
||||
MaxStandbysPerSender: cluster.Uint16P(cluster.DefaultMaxStandbysPerSender),
|
||||
SynchronousReplication: cluster.BoolP(true),
|
||||
},
|
||||
Status: cluster.ClusterStatus{
|
||||
CurrentGeneration: 1,
|
||||
Phase: cluster.ClusterPhaseNormal,
|
||||
Master: "db1",
|
||||
},
|
||||
},
|
||||
Keepers: cluster.Keepers{
|
||||
"keeper1": &cluster.Keeper{
|
||||
UID: "keeper1",
|
||||
Spec: &cluster.KeeperSpec{},
|
||||
Status: cluster.KeeperStatus{
|
||||
Healthy: true,
|
||||
LastHealthyTime: now,
|
||||
},
|
||||
},
|
||||
"keeper2": &cluster.Keeper{
|
||||
UID: "keeper2",
|
||||
Spec: &cluster.KeeperSpec{},
|
||||
Status: cluster.KeeperStatus{
|
||||
Healthy: true,
|
||||
LastHealthyTime: now,
|
||||
},
|
||||
},
|
||||
"keeper3": &cluster.Keeper{
|
||||
UID: "keeper3",
|
||||
Spec: &cluster.KeeperSpec{},
|
||||
Status: cluster.KeeperStatus{
|
||||
Healthy: true,
|
||||
LastHealthyTime: now,
|
||||
},
|
||||
},
|
||||
},
|
||||
DBs: cluster.DBs{
|
||||
"db1": &cluster.DB{
|
||||
UID: "db1",
|
||||
Generation: 2,
|
||||
ChangeTime: time.Time{},
|
||||
Spec: &cluster.DBSpec{
|
||||
KeeperUID: "keeper1",
|
||||
RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout},
|
||||
MaxStandbys: cluster.DefaultMaxStandbys,
|
||||
AdditionalWalSenders: cluster.DefaultAdditionalWalSenders,
|
||||
InitMode: cluster.DBInitModeNone,
|
||||
SynchronousReplication: true,
|
||||
Role: common.RoleMaster,
|
||||
Followers: []string{"db2", "db3"},
|
||||
SynchronousStandbys: []string{"db2", "db3"},
|
||||
ExternalSynchronousStandbys: []string{},
|
||||
},
|
||||
Status: cluster.DBStatus{
|
||||
Healthy: true,
|
||||
CurrentGeneration: 2,
|
||||
SynchronousStandbys: []string{"db2"},
|
||||
CurSynchronousStandbys: []string{},
|
||||
},
|
||||
},
|
||||
"db2": &cluster.DB{
|
||||
UID: "db2",
|
||||
Generation: 1,
|
||||
ChangeTime: time.Time{},
|
||||
Spec: &cluster.DBSpec{
|
||||
KeeperUID: "keeper2",
|
||||
RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout},
|
||||
MaxStandbys: cluster.DefaultMaxStandbys,
|
||||
AdditionalWalSenders: cluster.DefaultAdditionalWalSenders,
|
||||
InitMode: cluster.DBInitModeNone,
|
||||
SynchronousReplication: false,
|
||||
Role: common.RoleStandby,
|
||||
Followers: []string{},
|
||||
FollowConfig: &cluster.FollowConfig{
|
||||
Type: cluster.FollowTypeInternal,
|
||||
DBUID: "db1",
|
||||
},
|
||||
SynchronousStandbys: nil,
|
||||
ExternalSynchronousStandbys: nil,
|
||||
},
|
||||
Status: cluster.DBStatus{
|
||||
Healthy: false,
|
||||
CurrentGeneration: 1,
|
||||
},
|
||||
},
|
||||
"db3": &cluster.DB{
|
||||
UID: "db3",
|
||||
Generation: 1,
|
||||
ChangeTime: time.Time{},
|
||||
Spec: &cluster.DBSpec{
|
||||
KeeperUID: "keeper3",
|
||||
RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout},
|
||||
MaxStandbys: cluster.DefaultMaxStandbys,
|
||||
AdditionalWalSenders: cluster.DefaultAdditionalWalSenders,
|
||||
InitMode: cluster.DBInitModeNone,
|
||||
SynchronousReplication: false,
|
||||
Role: common.RoleStandby,
|
||||
Followers: []string{},
|
||||
FollowConfig: &cluster.FollowConfig{
|
||||
Type: cluster.FollowTypeInternal,
|
||||
DBUID: "db1",
|
||||
},
|
||||
SynchronousStandbys: nil,
|
||||
ExternalSynchronousStandbys: nil,
|
||||
},
|
||||
Status: cluster.DBStatus{
|
||||
Healthy: true,
|
||||
CurrentGeneration: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
Proxy: &cluster.Proxy{
|
||||
Generation: 1,
|
||||
Spec: cluster.ProxySpec{
|
||||
MasterDBUID: "db1",
|
||||
EnabledProxies: []string{},
|
||||
},
|
||||
},
|
||||
},
|
||||
outcd: &cluster.ClusterData{
|
||||
Cluster: &cluster.Cluster{
|
||||
UID: "cluster1",
|
||||
Generation: 1,
|
||||
Spec: &cluster.ClusterSpec{
|
||||
ConvergenceTimeout: &cluster.Duration{Duration: cluster.DefaultConvergenceTimeout},
|
||||
InitTimeout: &cluster.Duration{Duration: cluster.DefaultInitTimeout},
|
||||
SyncTimeout: &cluster.Duration{Duration: cluster.DefaultSyncTimeout},
|
||||
MaxStandbysPerSender: cluster.Uint16P(cluster.DefaultMaxStandbysPerSender),
|
||||
SynchronousReplication: cluster.BoolP(true),
|
||||
},
|
||||
Status: cluster.ClusterStatus{
|
||||
CurrentGeneration: 1,
|
||||
Phase: cluster.ClusterPhaseNormal,
|
||||
Master: "db1",
|
||||
},
|
||||
},
|
||||
Keepers: cluster.Keepers{
|
||||
"keeper1": &cluster.Keeper{
|
||||
UID: "keeper1",
|
||||
Spec: &cluster.KeeperSpec{},
|
||||
Status: cluster.KeeperStatus{
|
||||
Healthy: true,
|
||||
LastHealthyTime: now,
|
||||
},
|
||||
},
|
||||
"keeper2": &cluster.Keeper{
|
||||
UID: "keeper2",
|
||||
Spec: &cluster.KeeperSpec{},
|
||||
Status: cluster.KeeperStatus{
|
||||
Healthy: true,
|
||||
LastHealthyTime: now,
|
||||
},
|
||||
},
|
||||
"keeper3": &cluster.Keeper{
|
||||
UID: "keeper3",
|
||||
Spec: &cluster.KeeperSpec{},
|
||||
Status: cluster.KeeperStatus{
|
||||
Healthy: true,
|
||||
LastHealthyTime: now,
|
||||
},
|
||||
},
|
||||
},
|
||||
DBs: cluster.DBs{
|
||||
"db1": &cluster.DB{
|
||||
UID: "db1",
|
||||
Generation: 2,
|
||||
ChangeTime: time.Time{},
|
||||
Spec: &cluster.DBSpec{
|
||||
KeeperUID: "keeper1",
|
||||
RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout},
|
||||
MaxStandbys: cluster.DefaultMaxStandbys,
|
||||
AdditionalWalSenders: cluster.DefaultAdditionalWalSenders,
|
||||
InitMode: cluster.DBInitModeNone,
|
||||
SynchronousReplication: true,
|
||||
Role: common.RoleMaster,
|
||||
Followers: []string{"db2", "db3"},
|
||||
SynchronousStandbys: []string{"db2", "db3"},
|
||||
ExternalSynchronousStandbys: []string{},
|
||||
},
|
||||
Status: cluster.DBStatus{
|
||||
Healthy: true,
|
||||
CurrentGeneration: 2,
|
||||
SynchronousStandbys: []string{"db2"},
|
||||
CurSynchronousStandbys: []string{},
|
||||
},
|
||||
},
|
||||
"db2": &cluster.DB{
|
||||
UID: "db2",
|
||||
Generation: 1,
|
||||
ChangeTime: time.Time{},
|
||||
Spec: &cluster.DBSpec{
|
||||
KeeperUID: "keeper2",
|
||||
RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout},
|
||||
MaxStandbys: cluster.DefaultMaxStandbys,
|
||||
AdditionalWalSenders: cluster.DefaultAdditionalWalSenders,
|
||||
InitMode: cluster.DBInitModeNone,
|
||||
SynchronousReplication: false,
|
||||
Role: common.RoleStandby,
|
||||
Followers: []string{},
|
||||
FollowConfig: &cluster.FollowConfig{
|
||||
Type: cluster.FollowTypeInternal,
|
||||
DBUID: "db1",
|
||||
},
|
||||
SynchronousStandbys: nil,
|
||||
ExternalSynchronousStandbys: nil,
|
||||
},
|
||||
Status: cluster.DBStatus{
|
||||
Healthy: false,
|
||||
CurrentGeneration: 1,
|
||||
},
|
||||
},
|
||||
"db3": &cluster.DB{
|
||||
UID: "db3",
|
||||
Generation: 1,
|
||||
ChangeTime: time.Time{},
|
||||
Spec: &cluster.DBSpec{
|
||||
KeeperUID: "keeper3",
|
||||
RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout},
|
||||
MaxStandbys: cluster.DefaultMaxStandbys,
|
||||
AdditionalWalSenders: cluster.DefaultAdditionalWalSenders,
|
||||
InitMode: cluster.DBInitModeNone,
|
||||
SynchronousReplication: false,
|
||||
Role: common.RoleStandby,
|
||||
Followers: []string{},
|
||||
FollowConfig: &cluster.FollowConfig{
|
||||
Type: cluster.FollowTypeInternal,
|
||||
DBUID: "db1",
|
||||
},
|
||||
SynchronousStandbys: nil,
|
||||
ExternalSynchronousStandbys: nil,
|
||||
},
|
||||
Status: cluster.DBStatus{
|
||||
Healthy: true,
|
||||
CurrentGeneration: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
Proxy: &cluster.Proxy{
|
||||
Generation: 1,
|
||||
Spec: cluster.ProxySpec{
|
||||
MasterDBUID: "db1",
|
||||
EnabledProxies: []string{},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
// #22 From #21. master have not yet reported the new sync standbys as in sync (db3) and db3 is failed.
|
||||
// db2 will remain the unique real in sync db in
|
||||
// db1.Status.SynchronousStandbys and also db1.Spec.SynchronousStandbys
|
||||
// will contain only db2 (db3 removed)
|
||||
{
|
||||
cd: &cluster.ClusterData{
|
||||
Cluster: &cluster.Cluster{
|
||||
UID: "cluster1",
|
||||
Generation: 1,
|
||||
Spec: &cluster.ClusterSpec{
|
||||
ConvergenceTimeout: &cluster.Duration{Duration: cluster.DefaultConvergenceTimeout},
|
||||
InitTimeout: &cluster.Duration{Duration: cluster.DefaultInitTimeout},
|
||||
SyncTimeout: &cluster.Duration{Duration: cluster.DefaultSyncTimeout},
|
||||
MaxStandbysPerSender: cluster.Uint16P(cluster.DefaultMaxStandbysPerSender),
|
||||
SynchronousReplication: cluster.BoolP(true),
|
||||
},
|
||||
Status: cluster.ClusterStatus{
|
||||
CurrentGeneration: 1,
|
||||
Phase: cluster.ClusterPhaseNormal,
|
||||
Master: "db1",
|
||||
},
|
||||
},
|
||||
Keepers: cluster.Keepers{
|
||||
"keeper1": &cluster.Keeper{
|
||||
UID: "keeper1",
|
||||
Spec: &cluster.KeeperSpec{},
|
||||
Status: cluster.KeeperStatus{
|
||||
Healthy: true,
|
||||
LastHealthyTime: now,
|
||||
},
|
||||
},
|
||||
"keeper2": &cluster.Keeper{
|
||||
UID: "keeper2",
|
||||
Spec: &cluster.KeeperSpec{},
|
||||
Status: cluster.KeeperStatus{
|
||||
Healthy: true,
|
||||
LastHealthyTime: now,
|
||||
},
|
||||
},
|
||||
"keeper3": &cluster.Keeper{
|
||||
UID: "keeper3",
|
||||
Spec: &cluster.KeeperSpec{},
|
||||
Status: cluster.KeeperStatus{
|
||||
Healthy: true,
|
||||
LastHealthyTime: now,
|
||||
},
|
||||
},
|
||||
},
|
||||
DBs: cluster.DBs{
|
||||
"db1": &cluster.DB{
|
||||
UID: "db1",
|
||||
Generation: 2,
|
||||
ChangeTime: time.Time{},
|
||||
Spec: &cluster.DBSpec{
|
||||
KeeperUID: "keeper1",
|
||||
RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout},
|
||||
MaxStandbys: cluster.DefaultMaxStandbys,
|
||||
AdditionalWalSenders: cluster.DefaultAdditionalWalSenders,
|
||||
InitMode: cluster.DBInitModeNone,
|
||||
SynchronousReplication: true,
|
||||
Role: common.RoleMaster,
|
||||
Followers: []string{"db2", "db3"},
|
||||
SynchronousStandbys: []string{"db2", "db3"},
|
||||
ExternalSynchronousStandbys: []string{},
|
||||
},
|
||||
Status: cluster.DBStatus{
|
||||
Healthy: true,
|
||||
CurrentGeneration: 2,
|
||||
SynchronousStandbys: []string{"db2"},
|
||||
CurSynchronousStandbys: []string{},
|
||||
},
|
||||
},
|
||||
"db2": &cluster.DB{
|
||||
UID: "db2",
|
||||
Generation: 1,
|
||||
ChangeTime: time.Time{},
|
||||
Spec: &cluster.DBSpec{
|
||||
KeeperUID: "keeper2",
|
||||
RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout},
|
||||
MaxStandbys: cluster.DefaultMaxStandbys,
|
||||
AdditionalWalSenders: cluster.DefaultAdditionalWalSenders,
|
||||
InitMode: cluster.DBInitModeNone,
|
||||
SynchronousReplication: false,
|
||||
Role: common.RoleStandby,
|
||||
Followers: []string{},
|
||||
FollowConfig: &cluster.FollowConfig{
|
||||
Type: cluster.FollowTypeInternal,
|
||||
DBUID: "db1",
|
||||
},
|
||||
SynchronousStandbys: nil,
|
||||
ExternalSynchronousStandbys: nil,
|
||||
},
|
||||
Status: cluster.DBStatus{
|
||||
Healthy: false,
|
||||
CurrentGeneration: 1,
|
||||
},
|
||||
},
|
||||
"db3": &cluster.DB{
|
||||
UID: "db3",
|
||||
Generation: 1,
|
||||
ChangeTime: time.Time{},
|
||||
Spec: &cluster.DBSpec{
|
||||
KeeperUID: "keeper3",
|
||||
RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout},
|
||||
MaxStandbys: cluster.DefaultMaxStandbys,
|
||||
AdditionalWalSenders: cluster.DefaultAdditionalWalSenders,
|
||||
InitMode: cluster.DBInitModeNone,
|
||||
SynchronousReplication: false,
|
||||
Role: common.RoleStandby,
|
||||
Followers: []string{},
|
||||
FollowConfig: &cluster.FollowConfig{
|
||||
Type: cluster.FollowTypeInternal,
|
||||
DBUID: "db1",
|
||||
},
|
||||
SynchronousStandbys: nil,
|
||||
ExternalSynchronousStandbys: nil,
|
||||
},
|
||||
Status: cluster.DBStatus{
|
||||
Healthy: false,
|
||||
CurrentGeneration: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
Proxy: &cluster.Proxy{
|
||||
Generation: 1,
|
||||
Spec: cluster.ProxySpec{
|
||||
MasterDBUID: "db1",
|
||||
EnabledProxies: []string{},
|
||||
},
|
||||
},
|
||||
},
|
||||
outcd: &cluster.ClusterData{
|
||||
Cluster: &cluster.Cluster{
|
||||
UID: "cluster1",
|
||||
Generation: 1,
|
||||
Spec: &cluster.ClusterSpec{
|
||||
ConvergenceTimeout: &cluster.Duration{Duration: cluster.DefaultConvergenceTimeout},
|
||||
InitTimeout: &cluster.Duration{Duration: cluster.DefaultInitTimeout},
|
||||
SyncTimeout: &cluster.Duration{Duration: cluster.DefaultSyncTimeout},
|
||||
MaxStandbysPerSender: cluster.Uint16P(cluster.DefaultMaxStandbysPerSender),
|
||||
SynchronousReplication: cluster.BoolP(true),
|
||||
},
|
||||
Status: cluster.ClusterStatus{
|
||||
CurrentGeneration: 1,
|
||||
Phase: cluster.ClusterPhaseNormal,
|
||||
Master: "db1",
|
||||
},
|
||||
},
|
||||
Keepers: cluster.Keepers{
|
||||
"keeper1": &cluster.Keeper{
|
||||
UID: "keeper1",
|
||||
Spec: &cluster.KeeperSpec{},
|
||||
Status: cluster.KeeperStatus{
|
||||
Healthy: true,
|
||||
LastHealthyTime: now,
|
||||
},
|
||||
},
|
||||
"keeper2": &cluster.Keeper{
|
||||
UID: "keeper2",
|
||||
Spec: &cluster.KeeperSpec{},
|
||||
Status: cluster.KeeperStatus{
|
||||
Healthy: true,
|
||||
LastHealthyTime: now,
|
||||
},
|
||||
},
|
||||
"keeper3": &cluster.Keeper{
|
||||
UID: "keeper3",
|
||||
Spec: &cluster.KeeperSpec{},
|
||||
Status: cluster.KeeperStatus{
|
||||
Healthy: true,
|
||||
LastHealthyTime: now,
|
||||
},
|
||||
},
|
||||
},
|
||||
DBs: cluster.DBs{
|
||||
"db1": &cluster.DB{
|
||||
UID: "db1",
|
||||
Generation: 3,
|
||||
ChangeTime: time.Time{},
|
||||
Spec: &cluster.DBSpec{
|
||||
KeeperUID: "keeper1",
|
||||
RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout},
|
||||
MaxStandbys: cluster.DefaultMaxStandbys,
|
||||
AdditionalWalSenders: cluster.DefaultAdditionalWalSenders,
|
||||
InitMode: cluster.DBInitModeNone,
|
||||
SynchronousReplication: true,
|
||||
Role: common.RoleMaster,
|
||||
Followers: []string{"db2", "db3"},
|
||||
SynchronousStandbys: []string{"db2"},
|
||||
ExternalSynchronousStandbys: []string{},
|
||||
},
|
||||
Status: cluster.DBStatus{
|
||||
Healthy: true,
|
||||
CurrentGeneration: 2,
|
||||
SynchronousStandbys: []string{"db2"},
|
||||
CurSynchronousStandbys: []string{},
|
||||
},
|
||||
},
|
||||
"db2": &cluster.DB{
|
||||
UID: "db2",
|
||||
Generation: 1,
|
||||
ChangeTime: time.Time{},
|
||||
Spec: &cluster.DBSpec{
|
||||
KeeperUID: "keeper2",
|
||||
RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout},
|
||||
MaxStandbys: cluster.DefaultMaxStandbys,
|
||||
AdditionalWalSenders: cluster.DefaultAdditionalWalSenders,
|
||||
InitMode: cluster.DBInitModeNone,
|
||||
SynchronousReplication: false,
|
||||
Role: common.RoleStandby,
|
||||
Followers: []string{},
|
||||
FollowConfig: &cluster.FollowConfig{
|
||||
Type: cluster.FollowTypeInternal,
|
||||
DBUID: "db1",
|
||||
},
|
||||
SynchronousStandbys: nil,
|
||||
ExternalSynchronousStandbys: nil,
|
||||
},
|
||||
Status: cluster.DBStatus{
|
||||
Healthy: false,
|
||||
CurrentGeneration: 1,
|
||||
},
|
||||
},
|
||||
"db3": &cluster.DB{
|
||||
UID: "db3",
|
||||
Generation: 1,
|
||||
ChangeTime: time.Time{},
|
||||
Spec: &cluster.DBSpec{
|
||||
KeeperUID: "keeper3",
|
||||
RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout},
|
||||
MaxStandbys: cluster.DefaultMaxStandbys,
|
||||
AdditionalWalSenders: cluster.DefaultAdditionalWalSenders,
|
||||
InitMode: cluster.DBInitModeNone,
|
||||
SynchronousReplication: false,
|
||||
Role: common.RoleStandby,
|
||||
Followers: []string{},
|
||||
FollowConfig: &cluster.FollowConfig{
|
||||
Type: cluster.FollowTypeInternal,
|
||||
DBUID: "db1",
|
||||
},
|
||||
SynchronousStandbys: nil,
|
||||
ExternalSynchronousStandbys: nil,
|
||||
},
|
||||
Status: cluster.DBStatus{
|
||||
Healthy: false,
|
||||
CurrentGeneration: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
Proxy: &cluster.Proxy{
|
||||
Generation: 1,
|
||||
Spec: cluster.ProxySpec{
|
||||
MasterDBUID: "db1",
|
||||
EnabledProxies: []string{},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
// #23 From #19. master have reported the new sync standbys as in sync (db2, db3).
|
||||
// db2 will be removed from synchronousStandbys
|
||||
{
|
||||
cd: &cluster.ClusterData{
|
||||
@ -3748,9 +4263,10 @@ func TestUpdateCluster(t *testing.T) {
|
||||
ExternalSynchronousStandbys: []string{},
|
||||
},
|
||||
Status: cluster.DBStatus{
|
||||
Healthy: true,
|
||||
CurrentGeneration: 2,
|
||||
SynchronousStandbys: []string{"db2", "db3"},
|
||||
Healthy: true,
|
||||
CurrentGeneration: 2,
|
||||
SynchronousStandbys: []string{"db2", "db3"},
|
||||
CurSynchronousStandbys: []string{"db3"},
|
||||
},
|
||||
},
|
||||
"db2": &cluster.DB{
|
||||
@ -3873,9 +4389,10 @@ func TestUpdateCluster(t *testing.T) {
|
||||
ExternalSynchronousStandbys: []string{},
|
||||
},
|
||||
Status: cluster.DBStatus{
|
||||
Healthy: true,
|
||||
CurrentGeneration: 2,
|
||||
SynchronousStandbys: []string{"db2", "db3"},
|
||||
Healthy: true,
|
||||
CurrentGeneration: 2,
|
||||
SynchronousStandbys: []string{"db3"},
|
||||
CurSynchronousStandbys: []string{"db3"},
|
||||
},
|
||||
},
|
||||
"db2": &cluster.DB{
|
||||
@ -3938,7 +4455,7 @@ func TestUpdateCluster(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
// #22 From previous.
|
||||
// #24 From previous.
|
||||
// master db is not healty. db3 is the unique in common between the
|
||||
// reported (db2, db3) and the required in the spec (db3) so it'll be
|
||||
// elected as new master.
|
||||
@ -4190,7 +4707,7 @@ func TestUpdateCluster(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
// #23 One master and two standbys. Synchronous replication already
|
||||
// #25 One master and two standbys. Synchronous replication already
|
||||
// enabled with MinSynchronousStandbys and MaxSynchronousStandbys to 2.
|
||||
// master (db1) and db2 failed, db3 elected as master.
|
||||
// This test checks that the db3 synchronousStandbys are correctly sorted
|
||||
|
@ -631,10 +631,17 @@ type DBStatus struct {
|
||||
|
||||
PGParameters PGParameters `json:"pgParameters,omitempty"`
|
||||
|
||||
// DBUIDs of the internal standbys set as synchronous
|
||||
// DBUIDs of the internal standbys currently reported as in sync by the instance
|
||||
CurSynchronousStandbys []string `json:"-"`
|
||||
|
||||
// DBUIDs of the internal standbys that we know are in sync.
|
||||
// They could be currently down but we know that they were reported as in
|
||||
// sync in the past and they are defined inside synchronous_standby_names
|
||||
// so the instance will wait for acknowledge from them.
|
||||
SynchronousStandbys []string `json:"synchronousStandbys"`
|
||||
|
||||
// NOTE(sgotti) we currently don't report the external synchronous standbys.
|
||||
// If/when needed lets add a new ExternalSynchronousStandbys field
|
||||
SynchronousStandbys []string `json:"synchronousStandbys"`
|
||||
|
||||
OlderWalFile string `json:"olderWalFile,omitempty"`
|
||||
}
|
||||
|
@ -532,6 +532,12 @@ func (p *Manager) SetupRoles() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Manager) GetSyncStandbys() ([]string, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), p.requestTimeout)
|
||||
defer cancel()
|
||||
return getSyncStandbys(ctx, p.localConnParams)
|
||||
}
|
||||
|
||||
func (p *Manager) GetReplicationSlots() ([]string, error) {
|
||||
maj, _, err := p.PGDataVersion()
|
||||
if err != nil {
|
||||
|
@ -231,6 +231,34 @@ func getRole(ctx context.Context, connParams ConnParams) (common.Role, error) {
|
||||
return "", fmt.Errorf("no rows returned")
|
||||
}
|
||||
|
||||
func getSyncStandbys(ctx context.Context, connParams ConnParams) ([]string, error) {
|
||||
db, err := sql.Open("postgres", connParams.ConnString())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
rows, err := query(ctx, db, "select application_name, sync_state from pg_stat_replication")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
syncStandbys := []string{}
|
||||
for rows.Next() {
|
||||
var applicationName, syncState string
|
||||
if err := rows.Scan(&applicationName, &syncState); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if syncState == "sync" {
|
||||
syncStandbys = append(syncStandbys, applicationName)
|
||||
}
|
||||
}
|
||||
|
||||
return syncStandbys, nil
|
||||
}
|
||||
|
||||
func PGLsnToInt(lsn string) (uint64, error) {
|
||||
parts := strings.Split(lsn, "/")
|
||||
if len(parts) != 2 {
|
||||
|
@ -72,3 +72,14 @@ func CommonElements(a []string, b []string) []string {
|
||||
}
|
||||
return common
|
||||
}
|
||||
|
||||
// Difference returns elements in a - b
|
||||
func Difference(a []string, b []string) []string {
|
||||
diff := []string{}
|
||||
for _, v := range a {
|
||||
if !StringInSlice(b, v) {
|
||||
diff = append(diff, v)
|
||||
}
|
||||
}
|
||||
return diff
|
||||
}
|
||||
|
@ -67,3 +67,33 @@ func TestCompareStringSliceNoOrder(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestDifference(t *testing.T) {
|
||||
tests := []struct {
|
||||
a []string
|
||||
b []string
|
||||
r []string
|
||||
}{
|
||||
{[]string{}, []string{}, []string{}},
|
||||
{[]string{"", ""}, []string{""}, []string{}},
|
||||
{[]string{"", ""}, []string{"", ""}, []string{}},
|
||||
{[]string{"", ""}, []string{"a", "", "b"}, []string{}},
|
||||
{[]string{"a", "b"}, []string{"a", "b"}, []string{}},
|
||||
{[]string{"a", "b"}, []string{"b", "a"}, []string{}},
|
||||
{[]string{"a", "b", "c"}, []string{}, []string{"a", "b", "c"}},
|
||||
{[]string{"a", "b", "c"}, []string{"a", "b"}, []string{"c"}},
|
||||
{[]string{"a", "b"}, []string{"a", "b", "c"}, []string{}},
|
||||
{[]string{"a", "b"}, []string{"c", "a", "b"}, []string{}},
|
||||
{[]string{"a", "b", "c"}, []string{"a", "b", "c"}, []string{}},
|
||||
{[]string{"a", "b", "c"}, []string{"b", "c", "a"}, []string{}},
|
||||
{[]string{"a", "b", "c", "a"}, []string{"a", "c", "b", "b"}, []string{}},
|
||||
{[]string{"a", "b", "c", "a"}, []string{"a", "c", "b", "b"}, []string{}},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
r := Difference(tt.a, tt.b)
|
||||
if !CompareStringSliceNoOrder(r, tt.r) {
|
||||
t.Errorf("%d: got %v but wanted: %v a: %v, b: %v", i, r, tt.r, tt.a, tt.b)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user