Skip to content

Commit

Permalink
ddl: Add a time limit to check owner (pingcap#1875)
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala authored Oct 26, 2016
1 parent b80ba78 commit b6a2f2c
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 18 deletions.
47 changes: 33 additions & 14 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,38 @@ func asyncNotify(ch chan struct{}) {
}
}

const maxBgOwnerTimeout = int64(10 * time.Minute)
const maxOwnerTimeout = int64(20 * time.Minute)

// We define minBgOwnerTimeout and minDDLOwnerTimeout as variable,
// because we need to change them in test.
var (
minBgOwnerTimeout = int64(20 * time.Second)
minDDLOwnerTimeout = int64(4 * time.Second)
)

func (d *ddl) getCheckOwnerTimeout(flag JobType) int64 {
// we must wait 2 * lease time to guarantee other servers update the schema,
// the owner will update its owner status every 2 * lease time, so here we use
// 4 * lease to check its timeout.
timeout := int64(4 * d.lease)
if timeout > maxOwnerTimeout {
return maxOwnerTimeout
}

// The value of lease may be less than 1 second, so the operation of
// checking owner is frequent and it isn't necessary.
// So if timeout is less than 4 second, we will use default minDDLOwnerTimeout.
if flag == ddlJobFlag && timeout < minDDLOwnerTimeout {
return minDDLOwnerTimeout
}
if flag == bgJobFlag && timeout < minBgOwnerTimeout {
// Background job is serial processing, so we can extend the owner timeout to make sure
// a batch of rows will be processed before timeout.
// If timeout is less than maxBgOwnerTimeout, we will use default minBgOwnerTimeout.
return minBgOwnerTimeout
}
return timeout
}

func (d *ddl) checkOwner(t *meta.Meta, flag JobType) (*model.Owner, error) {
owner, err := d.getJobOwner(t, flag)
Expand All @@ -167,19 +198,7 @@ func (d *ddl) checkOwner(t *meta.Meta, flag JobType) (*model.Owner, error) {
}

now := time.Now().UnixNano()
// we must wait 2 * lease time to guarantee other servers update the schema,
// the owner will update its owner status every 2 * lease time, so here we use
// 4 * lease to check its timeout.
maxTimeout := int64(4 * d.lease)
if flag == bgJobFlag {
// Background job is serial processing, so we can extend the owner timeout to make sure
// a batch of rows will be processed before timeout. So here we use 20 * lease to check its timeout.
maxTimeout = int64(20 * d.lease)
// If 20 * lease is greater than maxBgOwnerTimeout, we will use default maxBgOwnerTimeout.
if maxTimeout > maxBgOwnerTimeout {
maxTimeout = maxBgOwnerTimeout
}
}
maxTimeout := d.getCheckOwnerTimeout(flag)
sub := now - owner.LastUpdateTS
if owner.OwnerID == d.uuid || sub > maxTimeout {
owner.OwnerID = d.uuid
Expand Down
22 changes: 18 additions & 4 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ import (

var _ = Suite(&testDDLSuite{})

type testDDLSuite struct {
originMinBgOwnerTimeout int64
originMinDDLOwnerTimeout int64
}

const testLease = 5 * time.Millisecond

func testCreateStore(c *C, name string) kv.Storage {
Expand All @@ -42,9 +47,6 @@ func testCreateStore(c *C, name string) kv.Storage {
return store
}

type testDDLSuite struct {
}

func testCheckOwner(c *C, d *ddl, isOwner bool, flag JobType) {
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
Expand All @@ -59,6 +61,18 @@ func testCheckOwner(c *C, d *ddl, isOwner bool, flag JobType) {
c.Assert(terror.ErrorEqual(err, errNotOwner), IsTrue)
}

func (s *testDDLSuite) SetUpSuite(c *C) {
s.originMinDDLOwnerTimeout = minDDLOwnerTimeout
s.originMinBgOwnerTimeout = minBgOwnerTimeout
minDDLOwnerTimeout = int64(4 * testLease)
minBgOwnerTimeout = int64(4 * testLease)
}

func (s *testDDLSuite) TearDownSuite(c *C) {
minDDLOwnerTimeout = s.originMinDDLOwnerTimeout
minBgOwnerTimeout = s.originMinBgOwnerTimeout
}

func (s *testDDLSuite) TestCheckOwner(c *C) {
defer testleak.AfterTest(c)()
store := testCreateStore(c, "test_owner")
Expand All @@ -80,7 +94,7 @@ func (s *testDDLSuite) TestCheckOwner(c *C) {
d1.close()

// Make sure owner is changed.
time.Sleep(21 * testLease)
time.Sleep(6 * testLease)

testCheckOwner(c, d2, true, ddlJobFlag)
testCheckOwner(c, d2, true, bgJobFlag)
Expand Down

0 comments on commit b6a2f2c

Please sign in to comment.