Skip to content

Commit

Permalink
domain: support retry if load failed.
Browse files Browse the repository at this point in the history
  • Loading branch information
siddontang committed Dec 5, 2015
1 parent 37fe78c commit 86d04a0
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 51 deletions.
85 changes: 44 additions & 41 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,43 +142,57 @@ func (do *Domain) tryReload() {
}
}

const minReloadTimeout = 20 * time.Second

func (do *Domain) reload() error {
// lock here for only once at same time.
do.m.Lock()
defer do.m.Unlock()

err := kv.RunInNewTxn(do.store, false, do.loadInfoSchema)
if err != nil {
return errors.Trace(err)
timeout := do.ddl.GetLease() / 2
if timeout < minReloadTimeout {
timeout = minReloadTimeout
}

atomic.StoreInt64(&do.lastLeaseTS, time.Now().UnixNano())
return nil
done := make(chan error, 1)
go func() {
var err error

for {
err = kv.RunInNewTxn(do.store, false, do.loadInfoSchema)
// if err is db closed, we will return it directly, otherwise, we will
// check reloading again.
if terror.ErrorEqual(err, localstore.ErrDBClosed) {
break
}

if err != nil {
// TODO: use a backoff algorithm.
time.Sleep(500 * time.Millisecond)
continue
}

atomic.StoreInt64(&do.lastLeaseTS, time.Now().UnixNano())
break
}

done <- err
}()

select {
case err := <-done:
return errors.Trace(err)
case <-time.After(timeout):
return errors.Errorf("reload schema timeout")
}
}

func (do *Domain) mustReload() {
// if reload error, we will terminate whole program to guarantee data safe.
// TODO: retry some times if reload error.
err := do.reload()
if err != nil {
log.Fatalf("reload schema err %v", err)
}
}

const maxReloadTimeout = 60 * time.Second
const minReloadTimeout = 20 * time.Second

func getReloadTimeout(lease time.Duration) time.Duration {
timeout := lease / 4

if timeout >= maxReloadTimeout {
return maxReloadTimeout
log.Fatalf("reload schema err %v", errors.ErrorStack(err))
}
if timeout <= minReloadTimeout {
return minReloadTimeout
}

return timeout
}

// check schema every 300 seconds default.
Expand All @@ -192,26 +206,16 @@ func (do *Domain) loadSchemaInLoop(lease time.Duration) {
ticker := time.NewTicker(lease)
defer ticker.Stop()

reloadTimeout := getReloadTimeout(lease)
reloadErrCh := make(chan error, 1)

for {
select {
case <-ticker.C:
go func() {
reloadErrCh <- do.reload()
}()
select {
case err := <-reloadErrCh:
// we may close store in test, but the domain load schema loop is still checking,
// so we can't panic for ErrDBClosed and just return here.
if terror.ErrorEqual(err, localstore.ErrDBClosed) {
return
} else if err != nil {
log.Fatalf("reload schema err %v", errors.ErrorStack(err))
}
case <-time.After(reloadTimeout):
log.Fatalf("reload schema timeout:%d", reloadTimeout)
err := do.reload()
// we may close store in test, but the domain load schema loop is still checking,
// so we can't panic for ErrDBClosed and just return here.
if terror.ErrorEqual(err, localstore.ErrDBClosed) {
return
} else if err != nil {
log.Fatalf("reload schema err %v", errors.ErrorStack(err))
}
case newLease := <-do.leaseCh:
if newLease <= 0 {
Expand All @@ -224,7 +228,6 @@ func (do *Domain) loadSchemaInLoop(lease time.Duration) {
}

lease = newLease
reloadTimeout = getReloadTimeout(lease)
// reset ticker too.
ticker.Stop()
ticker = time.NewTicker(lease)
Expand Down
19 changes: 9 additions & 10 deletions domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
package domain

import (
"sync/atomic"
"testing"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/localstore"
"github.com/pingcap/tidb/store/localstore/goleveldb"
"github.com/pingcap/tidb/util/mock"
Expand Down Expand Up @@ -59,16 +61,13 @@ func (*testSuite) TestT(c *C) {
c.Assert(err, IsNil)
c.Assert(m[ddlLastReloadSchemaTS], GreaterEqual, int64(0))

dom.SetLease(50 * time.Millisecond)
c.Assert(dom.GetScope("dummy_status"), Equals, variable.DefaultScopeFlag)

dom.SetLease(10 * time.Millisecond)
time.Sleep(20 * time.Millisecond)
atomic.StoreInt64(&dom.lastLeaseTS, 0)
dom.tryReload()

store.Close()
time.Sleep(1 * time.Second)
}

func (*testSuite) TestGetReloadTimeout(c *C) {
timeout := getReloadTimeout(241 * time.Second)
c.Assert(timeout, Equals, maxReloadTimeout)
timeout = getReloadTimeout(76 * time.Second)
c.Assert(timeout, Equals, minReloadTimeout)
timeout = getReloadTimeout(120 * time.Second)
c.Assert(timeout, Equals, 30*time.Second)
}

0 comments on commit 86d04a0

Please sign in to comment.