Skip to content

Commit

Permalink
server: support resigning ddl owner, use http method ddl/owner/resign (
Browse files Browse the repository at this point in the history
  • Loading branch information
winkyao authored Sep 11, 2018
1 parent 5ad005b commit f497444
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 35 deletions.
2 changes: 1 addition & 1 deletion ddl/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (s *testSchemaSuite) TestSchemaWaitJob(c *C) {
ctx := testNewContext(d2)

// d2 must not be owner.
d2.ownerManager.SetOwner(false)
d2.ownerManager.RetireOwner()

dbInfo := testSchemaInfo(c, d2, "test")
testCreateSchema(c, ctx, d2, dbInfo)
Expand Down
8 changes: 8 additions & 0 deletions docs/tidb_http_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,11 @@ timezone.*
curl http://{TiDBIP}:10080/tables/{colID}/{colFlag}/{colLen}?rowBin={val}
```
*Hint: For the column which field type is timezone dependent, e.g. `timestamp`, convert its value to UTC timezone.*

1. Resign the ddl owner, let tidb start a new ddl owner election.

```shell
curl -X POST http://{TiDBIP}:10080/ddl/owner/resign
```

**Note**: If you request a tidb that is not ddl owner, the response will be `This node is not a ddl owner, can't be resigned.`
77 changes: 51 additions & 26 deletions owner/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strconv"
"sync/atomic"
"time"
"unsafe"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
Expand All @@ -45,12 +46,14 @@ type Manager interface {
ID() string
// IsOwner returns whether the ownerManager is the owner.
IsOwner() bool
// SetOwner sets whether the ownerManager is the owner.
SetOwner(isOwner bool)
// RetireOwner make the manager to be a not owner. It's exported for testing.
RetireOwner()
// GetOwnerID gets the owner ID.
GetOwnerID(ctx context.Context) (string, error)
// CampaignOwner campaigns the owner.
CampaignOwner(ctx context.Context) error
// ResignOwner lets the owner start a new election.
ResignOwner(ctx context.Context) error
// Cancel cancels this etcd ownerManager campaign.
Cancel()
}
Expand All @@ -60,6 +63,7 @@ const (
NewSessionDefaultRetryCnt = 3
// NewSessionRetryUnlimited is the unlimited retry times when create new session.
NewSessionRetryUnlimited = math.MaxInt64
keyOpDefaultTimeout = 5 * time.Second
)

// DDLOwnerChecker is used to check whether tidb is owner.
Expand All @@ -70,22 +74,24 @@ type DDLOwnerChecker interface {

// ownerManager represents the structure which is used for electing owner.
type ownerManager struct {
owner int32
id string // id is the ID of the manager.
key string
prompt string
etcdCli *clientv3.Client
cancel context.CancelFunc
id string // id is the ID of the manager.
key string
prompt string
logPrefix string
etcdCli *clientv3.Client
cancel context.CancelFunc
elec unsafe.Pointer
}

// NewOwnerManager creates a new Manager.
func NewOwnerManager(etcdCli *clientv3.Client, prompt, id, key string, cancel context.CancelFunc) Manager {
return &ownerManager{
etcdCli: etcdCli,
id: id,
key: key,
prompt: prompt,
cancel: cancel,
etcdCli: etcdCli,
id: id,
key: key,
prompt: prompt,
cancel: cancel,
logPrefix: fmt.Sprintf("[%s] %s ownerManager %s", prompt, key, id),
}
}

Expand All @@ -96,16 +102,7 @@ func (m *ownerManager) ID() string {

// IsOwner implements Manager.IsOwner interface.
func (m *ownerManager) IsOwner() bool {
return atomic.LoadInt32(&m.owner) == 1
}

// SetOwner implements Manager.SetOwner interface.
func (m *ownerManager) SetOwner(isOwner bool) {
if isOwner {
atomic.StoreInt32(&m.owner, 1)
} else {
atomic.StoreInt32(&m.owner, 0)
}
return atomic.LoadPointer(&m.elec) != unsafe.Pointer(nil)
}

// Cancel implements Manager.Cancel interface.
Expand Down Expand Up @@ -179,6 +176,33 @@ func (m *ownerManager) CampaignOwner(ctx context.Context) error {
return nil
}

// ResignOwner lets the owner start a new election.
func (m *ownerManager) ResignOwner(ctx context.Context) error {
elec := (*concurrency.Election)(atomic.LoadPointer(&m.elec))
if elec == nil {
return errors.Errorf("This node is not a ddl owner, can't be resigned.")
}

childCtx, cancel := context.WithTimeout(ctx, keyOpDefaultTimeout)
err := elec.Resign(childCtx)
cancel()
if err != nil {
return errors.Trace(err)
}

log.Warnf("%s Resign ddl owner success!", m.logPrefix)
return nil
}

func (m *ownerManager) toBeOwner(elec *concurrency.Election) {
atomic.StorePointer(&m.elec, unsafe.Pointer(elec))
}

// RetireOwner make the manager to be a not owner.
func (m *ownerManager) RetireOwner() {
atomic.StorePointer(&m.elec, nil)
}

func (m *ownerManager) campaignLoop(ctx context.Context, etcdSession *concurrency.Session) {
defer func() {
if r := recover(); r != nil {
Expand All @@ -188,7 +212,7 @@ func (m *ownerManager) campaignLoop(ctx context.Context, etcdSession *concurrenc
}
}()

logPrefix := fmt.Sprintf("[%s] %s ownerManager %s", m.prompt, m.key, m.id)
logPrefix := m.logPrefix
var err error
for {
if err != nil {
Expand Down Expand Up @@ -232,9 +256,10 @@ func (m *ownerManager) campaignLoop(ctx context.Context, etcdSession *concurrenc
if err != nil {
continue
}
m.SetOwner(true)

m.toBeOwner(elec)
m.watchOwner(ctx, etcdSession, ownerKey)
m.SetOwner(false)
m.RetireOwner()

metrics.CampaignOwnerCounter.WithLabelValues(m.prompt, metrics.NoLongerOwner).Inc()
log.Warnf("%s isn't the owner", logPrefix)
Expand Down
24 changes: 16 additions & 8 deletions owner/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ func (m *mockManager) IsOwner() bool {
return atomic.LoadInt32(&m.owner) == 1
}

// SetOwner implements Manager.SetOwner interface.
func (m *mockManager) SetOwner(isOwner bool) {
if isOwner {
atomic.StoreInt32(&m.owner, 1)
} else {
atomic.StoreInt32(&m.owner, 0)
}
func (m *mockManager) toBeOwner() {
atomic.StoreInt32(&m.owner, 1)
}

// RetireOwner implements Manager.RetireOwner interface.
func (m *mockManager) RetireOwner() {
atomic.StoreInt32(&m.owner, 0)
}

// Cancel implements Manager.Cancel interface.
Expand All @@ -73,6 +73,14 @@ func (m *mockManager) GetOwnerID(ctx context.Context) (string, error) {

// CampaignOwner implements Manager.CampaignOwner interface.
func (m *mockManager) CampaignOwner(_ context.Context) error {
m.SetOwner(true)
m.toBeOwner()
return nil
}

// ResignOwner lets the owner start a new election.
func (m *mockManager) ResignOwner(ctx context.Context) error {
if m.IsOwner() {
m.RetireOwner()
}
return nil
}
36 changes: 36 additions & 0 deletions server/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,11 @@ type ddlHistoryJobHandler struct {
*tikvHandlerTool
}

// ddlResignOwnerHandler is the handler for resigning ddl owner.
type ddlResignOwnerHandler struct {
store kv.Storage
}

type serverInfoHandler struct {
*tikvHandlerTool
}
Expand Down Expand Up @@ -713,6 +718,37 @@ func (h ddlHistoryJobHandler) ServeHTTP(w http.ResponseWriter, req *http.Request
return
}

func (h ddlResignOwnerHandler) resignDDLOwner() error {
dom, err := session.GetDomain(h.store)
if err != nil {
return errors.Trace(err)
}

ownerMgr := dom.DDL().OwnerManager()
err = ownerMgr.ResignOwner(context.Background())
if err != nil {
return errors.Trace(err)
}
return nil
}

// ServeHTTP handles request of resigning ddl owner.
func (h ddlResignOwnerHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodPost {
writeError(w, errors.Errorf("This api only support POST method."))
return
}

err := h.resignDDLOwner()
if err != nil {
log.Error(err)
writeError(w, err)
return
}

writeData(w, "success!")
}

func (h tableHandler) getPDAddr() ([]string, error) {
var pdAddrs []string
etcd, ok := h.store.(domain.EtcdBackend)
Expand Down
2 changes: 2 additions & 0 deletions server/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/gorilla/mux"
"github.com/juju/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util/printer"
Expand Down Expand Up @@ -52,6 +53,7 @@ func (s *Server) startHTTPServer() {
router.Handle("/schema/{db}/{table}", schemaHandler{tikvHandlerTool})
router.Handle("/tables/{colID}/{colTp}/{colFlag}/{colLen}", valueHandler{})
router.Handle("/ddl/history", ddlHistoryJobHandler{tikvHandlerTool})
router.Handle("/ddl/owner/resign", ddlResignOwnerHandler{tikvHandlerTool.store.(kv.Storage)})

// HTTP path for get server info.
router.Handle("/info", serverInfoHandler{tikvHandlerTool})
Expand Down

0 comments on commit f497444

Please sign in to comment.