Skip to content

Commit

Permalink
domain: refactor NewDomain (pingcap#5270)
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala authored and tiancaiamao committed Nov 30, 2017
1 parent 09e5e7b commit b66e703
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 35 deletions.
62 changes: 29 additions & 33 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,9 @@ func (do *Domain) mustRestartSyncer() error {

// Close closes the Domain and release its resource.
func (do *Domain) Close() {
terror.Log(errors.Trace(do.ddl.Stop()))
if do.ddl != nil {
terror.Log(errors.Trace(do.ddl.Stop()))
}
close(do.exit)
if do.etcdClient != nil {
terror.Log(errors.Trace(do.etcdClient.Close()))
Expand Down Expand Up @@ -433,19 +435,24 @@ type EtcdBackend interface {
StartGCWorker() error
}

const resourceIdleTimeout = 3 * time.Minute // resources in the ResourcePool will be recycled after idleTimeout

// NewDomain creates a new domain. Should not create multiple domains for the same store.
func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duration, factory pools.Factory, sysFactory func(*Domain) (pools.Resource, error)) (*Domain, error) {
capacity := 200 // capacity of the sysSessionPool size
idleTimeout := 3 * time.Minute // sessions in the sysSessionPool will be recycled after idleTimeout
d := &Domain{
func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duration, factory pools.Factory) *Domain {
capacity := 200 // capacity of the sysSessionPool size
return &Domain{
store: store,
SchemaValidator: NewSchemaValidator(ddlLease),
exit: make(chan struct{}),
sysSessionPool: pools.NewResourcePool(factory, capacity, capacity, idleTimeout),
sysSessionPool: pools.NewResourcePool(factory, capacity, capacity, resourceIdleTimeout),
statsLease: statsLease,
infoHandle: infoschema.NewHandle(store),
}
}

if ebd, ok := store.(EtcdBackend); ok {
// Init initializes a domain.
func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.Resource, error)) error {
if ebd, ok := do.store.(EtcdBackend); ok {
if addrs := ebd.EtcdAddrs(); addrs != nil {
cli, err := clientv3.New(clientv3.Config{
Endpoints: addrs,
Expand All @@ -456,55 +463,44 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio
},
})
if err != nil {
return nil, errors.Trace(err)
return errors.Trace(err)
}
d.etcdClient = cli
do.etcdClient = cli
}
}

d.infoHandle = infoschema.NewHandle(d.store)
ctx := goctx.Background()
callback := &ddlCallback{do: d}

// TODO: Here we create new sessions with sysFac in DDL,
// which will use `d` as Domain instead of call `domap.Get`.
// which will use `do` as Domain instead of call `domap.Get`.
// That's because `domap.Get` requires a lock, but before
// we initialize Domain finish, we can't require that again.
// After we remove the lazy logic of creating Domain, we
// can simplify code here.
sysFac := func() (pools.Resource, error) {
return sysFactory(d)
return sysFactory(do)
}
sysCtxPool := pools.NewResourcePool(sysFac, 2, 2, idleTimeout)
d.ddl = ddl.NewDDL(ctx, d.etcdClient, d.store, d.infoHandle, callback, ddlLease, sysCtxPool)
var err error
defer func() {
// Clean up domain when initializing syncer failed or reloading failed.
// If we don't clean it, there are some dirty data when retrying this function.
if err != nil {
d.Close()
log.Errorf("[ddl] new domain failed %v", errors.ErrorStack(errors.Trace(err)))
}
}()
sysCtxPool := pools.NewResourcePool(sysFac, 2, 2, resourceIdleTimeout)
ctx := goctx.Background()
callback := &ddlCallback{do: do}
do.ddl = ddl.NewDDL(ctx, do.etcdClient, do.store, do.infoHandle, callback, ddlLease, sysCtxPool)

err = d.ddl.SchemaSyncer().Init(ctx)
err := do.ddl.SchemaSyncer().Init(ctx)
if err != nil {
return nil, errors.Trace(err)
return errors.Trace(err)
}
err = d.Reload()
err = do.Reload()
if err != nil {
return nil, errors.Trace(err)
return errors.Trace(err)
}

// Only when the store is local that the lease value is 0.
// If the store is local, it doesn't need loadSchemaInLoop.
if ddlLease > 0 {
d.wg.Add(1)
do.wg.Add(1)
// Local store needs to get the change information for every DDL state in each session.
go d.loadSchemaInLoop(ddlLease)
go do.loadSchemaInLoop(ddlLease)
}

return d, nil
return nil
}

// SysSessionPool returns the system session pool.
Expand Down
4 changes: 3 additions & 1 deletion domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ func (*testSuite) TestT(c *C) {
defer testleak.AfterTest(c)()
store, err := tikv.NewMockTikvStore()
c.Assert(err, IsNil)
dom, err := NewDomain(store, 80*time.Millisecond, 0, mockFactory, sysMockFactory)
ddlLease := 80 * time.Millisecond
dom := NewDomain(store, ddlLease, 0, mockFactory)
err = dom.Init(ddlLease, sysMockFactory)
c.Assert(err, IsNil)
defer func() {
dom.Close()
Expand Down
8 changes: 7 additions & 1 deletion tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,13 @@ func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) {
log.Infof("store %v new domain, ddl lease %v, stats lease %d", store.UUID(), ddlLease, statisticLease)
factory := createSessionFunc(store)
sysFactory := createSessionWithDomainFunc(store)
d, err1 = domain.NewDomain(store, ddlLease, statisticLease, factory, sysFactory)
d = domain.NewDomain(store, ddlLease, statisticLease, factory)
err1 = d.Init(ddlLease, sysFactory)
if err1 != nil {
// If we don't clean it, there are some dirty data when retrying the function of Init.
d.Close()
log.Errorf("[ddl] init domain failed %v", errors.ErrorStack(errors.Trace(err1)))
}
return true, errors.Trace(err1)
})
if err != nil {
Expand Down

0 comments on commit b66e703

Please sign in to comment.