Skip to content

Commit

Permalink
ddl: Use etcd to elect the owner (pingcap#3158)
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala authored and hanfei1991 committed May 17, 2017
1 parent 302aa70 commit ff18bbf
Show file tree
Hide file tree
Showing 7 changed files with 389 additions and 3 deletions.
4 changes: 3 additions & 1 deletion ddl/bg_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ func (d *ddl) handleBgJobQueue() error {
return errors.Trace(err)
}

if ChangeOwnerInNewWay {
return nil
}
owner.LastUpdateTS = time.Now().UnixNano()
err = t.SetBgJobOwner(owner)

return errors.Trace(err)
})
if err != nil {
Expand Down
12 changes: 11 additions & 1 deletion ddl/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@

package ddl

import "github.com/pingcap/tidb/model"
import (
"github.com/pingcap/tidb/model"
goctx "golang.org/x/net/context"
)

// Callback is the interface supporting callback function when DDL changed.
type Callback interface {
Expand All @@ -25,6 +28,8 @@ type Callback interface {
OnJobUpdated(job *model.Job)
// OnBgJobUpdated is called after the running background job is updated.
OnBgJobUpdated(job *model.Job)
// OnWatched is called after watching owner is completed.
OnWatched(ctx goctx.Context)
}

// BaseCallback implements Callback.OnChanged interface.
Expand All @@ -50,3 +55,8 @@ func (c *BaseCallback) OnJobUpdated(job *model.Job) {
func (c *BaseCallback) OnBgJobUpdated(job *model.Job) {
// Nothing to do.
}

// OnWatched implements Callback.OnWatched interface.
func (c *BaseCallback) OnWatched(ctx goctx.Context) {
// Nothing to do.
}
12 changes: 12 additions & 0 deletions ddl/callback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/util/testleak"
goctx "golang.org/x/net/context"
)

type testDDLCallback struct {
Expand All @@ -25,6 +26,7 @@ type testDDLCallback struct {
onJobRunBefore func(*model.Job)
onJobUpdated func(*model.Job)
onBgJobUpdated func(*model.Job)
onWatched func(ctx goctx.Context)
}

func (tc *testDDLCallback) OnJobRunBefore(job *model.Job) {
Expand Down Expand Up @@ -54,11 +56,21 @@ func (tc *testDDLCallback) OnBgJobUpdated(job *model.Job) {
tc.BaseCallback.OnBgJobUpdated(job)
}

func (tc *testDDLCallback) OnWatched(ctx goctx.Context) {
if tc.onWatched != nil {
tc.onWatched(ctx)
return
}

tc.BaseCallback.OnWatched(ctx)
}

func (s *testDDLSuite) TestCallback(c *C) {
defer testleak.AfterTest(c)()
cb := &BaseCallback{}
c.Assert(cb.OnChanged(nil), IsNil)
cb.OnJobRunBefore(nil)
cb.OnJobUpdated(nil)
cb.OnBgJobUpdated(nil)
cb.OnWatched(nil)
}
23 changes: 22 additions & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/pingcap/tidb/ast"
Expand All @@ -34,6 +35,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/terror"
"github.com/twinj/uuid"
goctx "golang.org/x/net/context"
)

var (
Expand Down Expand Up @@ -161,6 +163,8 @@ type ddl struct {
hook Callback
hookMu sync.RWMutex
store kv.Storage
// worker is used for electing the owner.
worker *worker
// lease is schema seconds.
lease time.Duration
uuid string
Expand All @@ -178,7 +182,9 @@ type ddl struct {
reorgRowCount int64

quitCh chan struct{}
wait sync.WaitGroup
// TODO: Use cancelFunc instead of quitCh.
cancelFunc goctx.CancelFunc
wait sync.WaitGroup
}

// RegisterEventCh registers passed channel for ddl Event.
Expand Down Expand Up @@ -215,6 +221,17 @@ func NewDDL(store kv.Storage, infoHandle *infoschema.Handle, hook Callback, leas
return newDDL(store, infoHandle, hook, lease)
}

// TODO: Move this to the function of newDDL.
func (d *ddl) setWorker(ctx goctx.Context, cli *clientv3.Client) {
d.worker = &worker{
ddlID: d.uuid,
etcdClient: cli,
}

ctx, d.cancelFunc = goctx.WithCancel(ctx)
d.campaignOwners(ctx)
}

func newDDL(store kv.Storage, infoHandle *infoschema.Handle, hook Callback, lease time.Duration) *ddl {
if hook == nil {
hook = &BaseCallback{}
Expand Down Expand Up @@ -298,6 +315,7 @@ func (d *ddl) start() {
d.wait.Add(2)
go d.onBackgroundWorker()
go d.onDDLWorker()

// For every start, we will send a fake job to let worker
// check owner firstly and try to find whether a job exists and run.
asyncNotify(d.ddlJobCh)
Expand All @@ -310,6 +328,9 @@ func (d *ddl) close() {
}

close(d.quitCh)
if d.worker != nil {
d.cancelFunc()
}

d.wait.Wait()
log.Infof("close DDL:%s", d.uuid)
Expand Down
15 changes: 15 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,18 @@ func (d *ddl) getCheckOwnerTimeout(flag JobType) int64 {
}

func (d *ddl) checkOwner(t *meta.Meta, flag JobType) (*model.Owner, error) {
if ChangeOwnerInNewWay {
if flag == ddlJobFlag {
if d.worker.isOwner() {
return nil, nil
}
return nil, errNotOwner
}
if d.worker.isBgOwner() {
return nil, nil
}
return nil, errNotOwner
}
owner, err := d.getJobOwner(t, flag)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -294,6 +306,9 @@ func (d *ddl) handleDDLJobQueue() error {

// Running job may cost some time, so here we must update owner status to
// prevent other become the owner.
if ChangeOwnerInNewWay {
return nil
}
owner.LastUpdateTS = time.Now().UnixNano()
err = t.SetDDLJobOwner(owner)
return errors.Trace(err)
Expand Down
176 changes: 176 additions & 0 deletions ddl/owner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl

import (
"sync/atomic"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/juju/errors"
"github.com/ngaut/log"
goctx "golang.org/x/net/context"
)

// ChangeOwnerInNewWay is used for testing.
var ChangeOwnerInNewWay = false

const (
ddlOwnerKey = "/tidb/ddl/owner"
bgOwnerKey = "/tidb/ddl/bg/owner"
newSessionDefaultRetryCnt = 3
)

// worker represents the structure which is used for electing owner.
type worker struct {
ddlOwner int32
bgOwner int32
ddlID string
etcdClient *clientv3.Client
etcdSession *concurrency.Session
}

func (w *worker) isOwner() bool {
return atomic.LoadInt32(&w.ddlOwner) == 1
}

func (w *worker) setOwner(isOwner bool) {
if isOwner {
atomic.StoreInt32(&w.ddlOwner, 1)
} else {
atomic.StoreInt32(&w.ddlOwner, 0)
}
}

func (w *worker) isBgOwner() bool {
return atomic.LoadInt32(&w.bgOwner) == 1
}

func (w *worker) setBgOwner(isOwner bool) {
if isOwner {
atomic.StoreInt32(&w.bgOwner, 1)
} else {
atomic.StoreInt32(&w.bgOwner, 0)
}
}

func (w *worker) newSession(ctx goctx.Context, retryCnt int) error {
var err error
for i := 0; i < retryCnt; i++ {
w.etcdSession, err = concurrency.NewSession(w.etcdClient, concurrency.WithContext(ctx))
if err != nil {
log.Warnf("[ddl] failed to new session, err %v", err)
time.Sleep(200 * time.Millisecond)
continue
}
break
}
return errors.Trace(err)
}

func (d *ddl) campaignOwners(ctx goctx.Context) error {
err := d.worker.newSession(ctx, newSessionDefaultRetryCnt)
if err != nil {
return errors.Trace(err)
}

d.wait.Add(2)
ddlCtx, _ := goctx.WithCancel(ctx)
go d.campaignLoop(ddlCtx, ddlOwnerKey)

bgCtx, _ := goctx.WithCancel(ctx)
go d.campaignLoop(bgCtx, bgOwnerKey)
return nil
}

func (d *ddl) campaignLoop(ctx goctx.Context, key string) {
defer d.wait.Done()
worker := d.worker
for {
select {
case <-worker.etcdSession.Done():
// TODO: Create session again?
log.Warnf("etcd session is done.")
case <-ctx.Done():
return
default:
}

elec := concurrency.NewElection(worker.etcdSession, key)
err := elec.Campaign(ctx, worker.ddlID)
if err != nil {
log.Infof("[ddl] worker %s failed to campaign, err %v", worker.ddlID, err)
continue
}

// Get owner information.
resp, err := elec.Leader(ctx)
if err != nil {
// If no leader elected currently, it returns ErrElectionNoLeader.
log.Infof("[ddl] failed to get leader, err %v", err)
continue
}
leader := string(resp.Kvs[0].Value)
log.Info("[ddl] %s worker is %s, owner is %v", key, worker.ddlID, leader)
if leader == worker.ddlID {
worker.setOwnerVal(key, true)
} else {
log.Warnf("[ddl] worker %s isn't the owner", worker.ddlID)
continue
}

// TODO: Use content instead of quitCh.
worker.watchOwner(ctx, string(resp.Kvs[0].Key))
worker.setOwnerVal(key, false)
d.hookMu.Lock()
d.hook.OnWatched(ctx)
d.hookMu.Unlock()
}
}

func (w *worker) setOwnerVal(key string, val bool) {
if key == ddlOwnerKey {
w.setOwner(val)
} else {
w.setBgOwner(val)
}
}

func (w *worker) watchOwner(ctx goctx.Context, key string) {
log.Debugf("[ddl] worker %s watch owner key %v", w.ddlID, key)
watchCh := w.etcdClient.Watch(ctx, key)
for {
select {
case resp := <-watchCh:
if resp.Canceled {
log.Infof("[ddl] worker %s watch owner key %v failed, no owner",
w.ddlID, key)
return
}

for _, ev := range resp.Events {
if ev.Type == mvccpb.DELETE {
log.Infof("[ddl] worker %s watch owner key %v failed, owner is deleted", w.ddlID, key)
return
}
}
case <-w.etcdSession.Done():
return
case <-ctx.Done():
return
}
}
}
Loading

0 comments on commit ff18bbf

Please sign in to comment.