Skip to content

Commit

Permalink
domain: raise panic if reload failed and try reload before get.
Browse files Browse the repository at this point in the history
  • Loading branch information
siddontang committed Nov 17, 2015
1 parent 6019d1f commit 8aedcdc
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 16 deletions.
49 changes: 33 additions & 16 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ import (
// Domain represents a storage space. Different domains can use the same database name.
// Multiple domains can be used in parallel without synchronization.
type Domain struct {
store kv.Storage
infoHandle *infoschema.Handle
ddl ddl.DDL
leaseCh chan time.Duration
store kv.Storage
infoHandle *infoschema.Handle
ddl ddl.DDL
leaseCh chan time.Duration
// nano seconds
lastLeaseTS int64
}

Expand Down Expand Up @@ -82,6 +83,8 @@ func (do *Domain) loadInfoSchema(txn kv.Transaction) (err error) {

// InfoSchema gets information schema from domain.
func (do *Domain) InfoSchema() infoschema.InfoSchema {
// try reload if possible.
do.tryReload()
return do.infoHandle.Get()
}

Expand Down Expand Up @@ -110,14 +113,33 @@ func (do *Domain) Stat() (map[string]interface{}, error) {
return nil, errors.Trace(err)
}

m["ddl_last_reload_schema_ts"] = atomic.LoadInt64(&do.lastLeaseTS)
m["ddl_last_reload_schema_ts"] = atomic.LoadInt64(&do.lastLeaseTS) / 1e9

return m, nil
}

func (do *Domain) reload() error {
func (do *Domain) tryReload() {
// if we don't have update the schema for a long time > lease, we must force reloading it.
// Although we try to reload schema every lease time in a goroutine, sometimes it may not
// run accurately, e.g, the machine has a very high load, running the ticker is delayed.
last := atomic.LoadInt64(&do.lastLeaseTS)
lease := do.ddl.GetLease()

// if lease is 0, we use the local store, so no need to reload.
if lease > 0 && time.Now().UnixNano()-last > lease.Nanoseconds() {
do.reload()
}
}

func (do *Domain) reload() {
// if reload error, we will terminate whole program to guarantee data safe.
// TODO: retry some times if reload error.
err := kv.RunInNewTxn(do.store, false, do.loadInfoSchema)
return errors.Trace(err)
if err != nil {
log.Fatalf("reload schema err %v", err)
}

atomic.StoreInt64(&do.lastLeaseTS, time.Now().UnixNano())
}

// check schema every 300 seconds default.
Expand All @@ -134,10 +156,7 @@ func (do *Domain) loadSchemaInLoop(lease time.Duration) {
for {
select {
case <-ticker.C:
if err := do.reload(); err != nil {
log.Fatalf("reload schema err %v", err)
}
atomic.StoreInt64(&do.lastLeaseTS, time.Now().Unix())
do.reload()
case newLease := <-do.leaseCh:
if newLease <= 0 {
newLease = defaultLoadTime
Expand Down Expand Up @@ -167,7 +186,8 @@ func (c *ddlCallback) OnChanged(err error) error {
}
log.Warnf("on DDL change")

return errors.Trace(c.do.reload())
c.do.reload()
return nil
}

// NewDomain creates a new domain.
Expand All @@ -179,10 +199,7 @@ func NewDomain(store kv.Storage, lease time.Duration) (d *Domain, err error) {

d.infoHandle = infoschema.NewHandle(d.store)
d.ddl = ddl.NewDDL(d.store, d.infoHandle, &ddlCallback{do: d}, lease)
err = d.reload()
if err != nil {
log.Fatalf("load schema err %v", err)
}
d.reload()

go d.loadSchemaInLoop(lease)

Expand Down
7 changes: 7 additions & 0 deletions domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,11 @@ func (*testSuite) TestT(c *C) {
c.Assert(err, IsNil)

dom.SetLease(10 * time.Second)

m, err := dom.Stat()
c.Assert(err, IsNil)
c.Assert(m["ddl_last_reload_schema_ts"], GreaterEqual, int64(0))

dom.SetLease(50 * time.Millisecond)
time.Sleep(1 * time.Second)
}

0 comments on commit 8aedcdc

Please sign in to comment.