Skip to content

Commit

Permalink
Merge pull request sorintlab#514 from sgotti/overcome_postgres_sync_r…
Browse files Browse the repository at this point in the history
…epl_limit_causing_lost_transactions_under_some_events

*: overcome postgres sync repl limit causing lost transactions under some events.
  • Loading branch information
sgotti committed Sep 10, 2018
2 parents 7658043 + 87766c9 commit 4422daf
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 20 deletions.
64 changes: 48 additions & 16 deletions cmd/keeper/cmd/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,8 @@ type PostgresKeeper struct {
pgStateMutex sync.Mutex
getPGStateMutex sync.Mutex
lastPGState *cluster.PostgresState

waitSyncStandbysSynced bool
}

func NewPostgresKeeper(cfg *config, end chan error) (*PostgresKeeper, error) {
Expand Down Expand Up @@ -992,8 +994,8 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
return
}

// Dynamicly generate hba auth from clusterData
pgm.SetHba(p.generateHBA(cd, db))
// Generate hba auth from clusterData
pgm.SetHba(p.generateHBA(cd, db, p.waitSyncStandbysSynced))

var pgParameters common.Parameters

Expand Down Expand Up @@ -1035,6 +1037,7 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
log.Infow("current db UID different than cluster data db UID", "db", p.dbLocalState.UID, "cdDB", db.UID)

pgm.SetRecoveryParameters(nil)
p.waitSyncStandbysSynced = false

switch db.Spec.InitMode {
case cluster.DBInitModeNew:
Expand Down Expand Up @@ -1394,6 +1397,13 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
return
}
if !started {
// if we have syncrepl enabled and the postgres instance is stopped, before opening connections to normal users wait for having the defined synchronousStandbys in sync state.
if db.Spec.SynchronousReplication {
p.waitSyncStandbysSynced = true
log.Infow("not allowing connection as normal users since synchronous replication is enabled and instance was down")
pgm.SetHba(p.generateHBA(cd, db, true))
}

if err = pgm.Start(); err != nil {
log.Errorw("failed to start postgres", zap.Error(err))
return
Expand Down Expand Up @@ -1547,8 +1557,24 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
log.Infow("postgres parameters not changed")
}

// Dynamicly generate hba auth from clusterData
newHBA := p.generateHBA(cd, db)
// Generate hba auth from clusterData

// if we have syncrepl enabled and the postgres instance is stopped, before opening connections to normal users wait for having the defined synchronousStandbys in sync state.
if db.Spec.SynchronousReplication && p.waitSyncStandbysSynced {
inSyncStandbys, err := p.GetInSyncStandbys()
if err != nil {
log.Errorw("failed to retrieve current in sync standbys from instance", zap.Error(err))
return
}
if !util.CompareStringSliceNoOrder(inSyncStandbys, db.Spec.SynchronousStandbys) {
log.Infow("not allowing connection as normal users since synchronous replication is enabled, instance was down and not all sync standbys are synced")
} else {
p.waitSyncStandbysSynced = false
}
} else {
p.waitSyncStandbysSynced = false
}
newHBA := p.generateHBA(cd, db, p.waitSyncStandbysSynced)
if !reflect.DeepEqual(newHBA, pgm.CurHba()) {
log.Infow("postgres hba entries changed, reloading postgres instance")
pgm.SetHba(newHBA)
Expand Down Expand Up @@ -1653,8 +1679,12 @@ func IsMaster(db *cluster.DB) bool {
}
}

// generateHBA generates the instance hba entries depending on the value of DefaultSUReplAccessMode.
func (p *PostgresKeeper) generateHBA(cd *cluster.ClusterData, db *cluster.DB) []string {
// generateHBA generates the instance hba entries depending on the value of
// DefaultSUReplAccessMode.
// When onlyInternal is true only rules needed for replication will be setup
// and the traffic should be permitted only for pgSUUsername standard
// connections and pgReplUsername replication connections.
func (p *PostgresKeeper) generateHBA(cd *cluster.ClusterData, db *cluster.DB, onlyInternal bool) []string {
// Minimal entries for local normal and replication connections needed by the stolon keeper
// Matched local connections are for postgres database and suUsername user with md5 auth
// Matched local replication connections are for replUsername user with md5 auth
Expand Down Expand Up @@ -1693,16 +1723,18 @@ func (p *PostgresKeeper) generateHBA(cd *cluster.ClusterData, db *cluster.DB) []
}
}

// By default, if no custom pg_hba entries are provided, accept
// connections for all databases and users with md5 auth
if db.Spec.PGHBA != nil {
computedHBA = append(computedHBA, db.Spec.PGHBA...)
} else {
computedHBA = append(
computedHBA,
"host all all 0.0.0.0/0 md5",
"host all all ::0/0 md5",
)
if !onlyInternal {
// By default, if no custom pg_hba entries are provided, accept
// connections for all databases and users with md5 auth
if db.Spec.PGHBA != nil {
computedHBA = append(computedHBA, db.Spec.PGHBA...)
} else {
computedHBA = append(
computedHBA,
"host all all 0.0.0.0/0 md5",
"host all all ::0/0 md5",
)
}
}

// return generated Hba merged with user Hba
Expand Down
2 changes: 1 addition & 1 deletion cmd/keeper/cmd/keeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func TestGenerateHBA(t *testing.T) {
db := cd.DBs[tt.dbUID]
db.Spec.PGHBA = tt.pgHBA

out := p.generateHBA(cd, db)
out := p.generateHBA(cd, db, false)

if !reflect.DeepEqual(out, tt.out) {
var b bytes.Buffer
Expand Down
2 changes: 1 addition & 1 deletion doc/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ stolon let you easily integrate with any backup/restore solution. See the [point

When using async replication the leader sentinel tries to find the best standby using a valid standby with the (last reported) nearest xlog location to the master latest knows xlog location. If a master is down there's no way to know its latest xlog position (stolon get and save it at some intervals) so there's no way to guarantee that the standby is not behind but just that the best standby of the ones available will be choosen.

When using synchronous replication only synchronous standbys will be choosen so standbys behind the master won't be choosen (be aware of postgresql synchronous replication limits explaned in the [postgresql documentation](https://www.postgresql.org/docs/9.6/static/warm-standby.html#SYNCHRONOUS-REPLICATION), for example, when a master restarts while no synchronous standbys are available, the transactions waiting for acknowledgement on the master will be marked as fully committed. We are thinking of a way to avoid this using stolon).
When using synchronous replication only synchronous standbys will be choosen so standbys behind the master won't be choosen (be aware of postgresql synchronous replication limits explaned in the [postgresql documentation](https://www.postgresql.org/docs/9.6/static/warm-standby.html#SYNCHRONOUS-REPLICATION), for example, when a master restarts while no synchronous standbys are available, the transactions waiting for acknowledgement on the master will be marked as fully committed. This is "fixed" by stolon. See the [synchronous replication doc](syncrepl.md).

## Does stolon uses postgres sync replication [quorum methods](https://www.postgresql.org/docs/10/static/runtime-config-replication.html#RUNTIME-CONFIG-REPLICATION-MASTER) (FIRST or ANY)?

Expand Down
29 changes: 29 additions & 0 deletions doc/syncrepl.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,32 @@ Set MinSynchronousStandbys/MaxSynchronousStandbys to a value greater than 1 (onl
```
stolonctl --cluster-name=mycluster --store-backend=etcd update --patch '{ "synchronousReplication" : true, "minSynchronousStandbys": 2, "maxSynchronousStandbys": 3 }'
```

## Handling postgresql sync repl limits under such circumstances

Postgres synchronous replication has a downside explained in the [docs](https://www.postgresql.org/docs/current/static/warm-standby.html)

`If primary restarts while commits are waiting for acknowledgement, those waiting transactions will be marked fully committed once the primary database recovers. There is no way to be certain that all standbys have received all outstanding WAL data at time of the crash of the primary. Some transactions may not show as committed on the standby, even though they show as committed on the primary. The guarantee we offer is that the application will not receive explicit acknowledgement of the successful commit of a transaction until the WAL data is known to be safely received by all the synchronous standbys.`

Under some events this will cause lost transactions. For example:

* Sync standby goes down.
* A client commits a transaction, it blocks waiting for acknowledgement.
* Primary restart, it'll mark the above transaction as fully committed. All the
clients will now see that transaction.
* Primary dies
* Standby comes back.
* The sentinel will elect the standby as the new master since it's in the
synchronous_standby_names list.
* The above transaction will be lost despite synchronous replication being
enabled.

So there can be some conditions where a syncstandby could be elected also if it's missing the last transactions if it was down at the commit time.

It's not easy to fix this issue since these events cannot be resolved by the sentinel because it's not possible to know if a sync standby is really in sync when the master is down (since we cannot query its last wal position and the reporting from the keeper is asynchronous).

But with stolon we have the power to overcome this issue by noticing when a primary restarts (since we control it), allow only "internal" connections until all the defined synchronous standbys are really in sync.

Allowing only "internal" connections means not adding the default rules or the user defined pgHBA rules but only the rules needed for replication (and local communication from the keeper).

Since "internal" rules accepts the defined superuser and replication users, client should not use these roles for normal operation or the above solution won't work (but they shouldn't do it anyway since this could cause exhaustion of reserved superuser connections needed by the keeper to check the instance).
112 changes: 112 additions & 0 deletions tests/integration/ha_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@ package integration

import (
"context"
"database/sql"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
"syscall"
"testing"
"time"

"github.com/satori/go.uuid"
"github.com/sorintlab/stolon/internal/cluster"
"github.com/sorintlab/stolon/internal/common"
pg "github.com/sorintlab/stolon/internal/postgresql"
"github.com/sorintlab/stolon/internal/store"
)

Expand Down Expand Up @@ -1929,3 +1932,112 @@ func TestForceFailSyncReplStandbyCluster(t *testing.T) {
t.Parallel()
testForceFail(t, false, true)
}

// TestSyncStandbyNotInSync tests that, when using synchronous replication, a
// normal user cannot connect to primary db after it has restarted until all
// defined synchronous standbys are in sync.
func TestSyncStandbyNotInSync(t *testing.T) {
t.Parallel()
dir, err := ioutil.TempDir("", "stolon")
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer os.RemoveAll(dir)
clusterName := uuid.NewV4().String()
tks, tss, tp, tstore := setupServers(t, clusterName, dir, 2, 1, true, false, nil)
defer shutdown(tks, tss, tp, tstore)
storePath := filepath.Join(common.StorePrefix, clusterName)
sm := store.NewKVBackedStore(tstore.store, storePath)
master, standbys := waitMasterStandbysReady(t, sm, tks)
standby := standbys[0]
if err := WaitClusterDataSynchronousStandbys([]string{standby.uid}, sm, 30*time.Second); err != nil {
t.Fatalf("expected synchronous standby on keeper %q in cluster data", standby.uid)
}
if err := populate(t, master); err != nil {
t.Fatalf("unexpected err: %v", err)
}
if err := write(t, master, 1, 1); err != nil {
t.Fatalf("unexpected err: %v", err)
}
// create a normal user
if _, err := master.Exec("CREATE USER user01 PASSWORD 'password'"); err != nil {
t.Fatalf("unexpected err: %v", err)
}
if _, err := master.Exec("GRANT ALL ON DATABASE postgres TO user01"); err != nil {
t.Fatalf("unexpected err: %v", err)
}
if _, err := master.Exec("GRANT ALL ON TABLE table01 TO user01"); err != nil {
t.Fatalf("unexpected err: %v", err)
}
connParams := pg.ConnParams{
"user": "user01",
"password": "password",
"host": master.pgListenAddress,
"port": master.pgPort,
"dbname": "postgres",
"sslmode": "disable",
}
connString := connParams.ConnString()
user01db, err := sql.Open("postgres", connString)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
if _, err := user01db.Exec("SELECT * from table01"); err != nil {
t.Fatalf("unexpected err: %v", err)
}
// get the master XLogPos
xLogPos, err := GetXLogPos(master)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
// wait for the keepers to have reported their state
if err := WaitClusterSyncedXLogPos([]*TestKeeper{master, standby}, xLogPos, sm, 20*time.Second); err != nil {
t.Fatalf("unexpected err: %v", err)
}
// the proxy should connect to the right master
if err := tp.WaitRightMaster(master, 3*cluster.DefaultProxyCheckInterval); err != nil {
t.Fatalf("unexpected err: %v", err)
}
// Stop the standby keeper, should also stop the database
t.Logf("Stopping current standby keeper: %s", standby.uid)
standby.Stop()
// this call will block and then exit with an error when the master is restarted
go func() {
write(t, master, 2, 2)
}()
time.Sleep(1 * time.Second)
// restart master
t.Logf("Restarting current master keeper: %s", master.uid)
master.Stop()
master.Start()
waitKeeperReady(t, sm, master)
// The transaction should be fully committed on master
c, err := getLines(t, master)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
if c != 2 {
t.Fatalf("wrong number of lines, want: %d, got: %d", 2, c)
}
// The normal user shouldn't be able to connect
if _, err := user01db.Exec("SELECT * from table01"); err != nil {
exp := `pq: no pg_hba.conf entry for host "127.0.0.1", user "user01", database "postgres"`
if !strings.HasPrefix(err.Error(), exp) {
t.Fatalf("expected error when connecting to db as user01 starting with %q, got err: %q", exp, err.Error())
}
} else {
t.Fatalf("expected error connecting to db as user01, got no err")
}
// Starting the standby keeper
t.Logf("Starting current standby keeper: %s", standby.uid)
standby.Start()
time.Sleep(10 * time.Second)
// The normal user should now be able to connect and see 2 lines
c, err = getLines(t, user01db)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
if c != 2 {
t.Fatalf("wrong number of lines, want: %d, got: %d", 2, c)
}
}
7 changes: 5 additions & 2 deletions tests/integration/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ func pgParametersWithDefaults(p cluster.PGParameters) cluster.PGParameters {
type Querier interface {
Exec(query string, args ...interface{}) (sql.Result, error)
Query(query string, args ...interface{}) (*sql.Rows, error)
}

type ReplQuerier interface {
ReplQuery(query string, args ...interface{}) (*sql.Rows, error)
}

Expand All @@ -94,7 +97,7 @@ func GetPGParameters(q Querier) (common.Parameters, error) {
return pgParameters, nil
}

func GetSystemData(q Querier) (*pg.SystemData, error) {
func GetSystemData(q ReplQuerier) (*pg.SystemData, error) {
rows, err := q.ReplQuery("IDENTIFY_SYSTEM")
if err != nil {
return nil, err
Expand All @@ -116,7 +119,7 @@ func GetSystemData(q Querier) (*pg.SystemData, error) {
return nil, fmt.Errorf("query returned 0 rows")
}

func GetXLogPos(q Querier) (uint64, error) {
func GetXLogPos(q ReplQuerier) (uint64, error) {
// get the current master XLogPos
systemData, err := GetSystemData(q)
if err != nil {
Expand Down

0 comments on commit 4422daf

Please sign in to comment.