forked from pingcap/tidb
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
domain: schema lease manage abstraction (pingcap#2137)
- Loading branch information
1 parent
5f96f67
commit c29ebe0
Showing
2 changed files
with
191 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
// Copyright 2016 PingCAP, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package domain | ||
|
||
import ( | ||
"sync" | ||
"time" | ||
) | ||
|
||
// SchemaValidator is the interface for checking the validity of schema version. | ||
type SchemaValidator interface { | ||
// Update the schema validator, add a new item, delete the expired items. | ||
// The schemaVer is valid within leaseGrantTime plus lease duration. | ||
Update(leaseGrantTime uint64, schemaVer int64) | ||
// Check is it valid for a transaction to use schemaVer, at timestamp txnTS. | ||
Check(txnTS uint64, schemaVer int64) bool | ||
// Latest returns the latest schema version it knows, but not necessary a valid one. | ||
Latest() int64 | ||
} | ||
|
||
type schemaValidator struct { | ||
mux sync.RWMutex | ||
lease time.Duration | ||
items map[int64]time.Time | ||
latestSchemaVer int64 | ||
} | ||
|
||
func newSchemaValidator(lease time.Duration) SchemaValidator { | ||
return &schemaValidator{ | ||
lease: lease, | ||
items: make(map[int64]time.Time), | ||
} | ||
} | ||
|
||
func (s *schemaValidator) Update(leaseGrantTS uint64, schemaVer int64) { | ||
s.mux.Lock() | ||
|
||
s.latestSchemaVer = schemaVer | ||
leaseGrantTime := extractPhysicalTime(leaseGrantTS) | ||
leaseExpire := leaseGrantTime.Add(s.lease) | ||
|
||
// Renewal lease. | ||
s.items[schemaVer] = leaseExpire | ||
|
||
// Delete expired items, leaseGrantTime is server current time, actually. | ||
for k, expire := range s.items { | ||
if leaseGrantTime.After(expire) { | ||
delete(s.items, k) | ||
} | ||
} | ||
|
||
s.mux.Unlock() | ||
} | ||
|
||
// Check checks schema validity, returns true if use schemaVer at txnTS is legal. | ||
func (s *schemaValidator) Check(txnTS uint64, schemaVer int64) bool { | ||
s.mux.RLock() | ||
defer s.mux.RUnlock() | ||
|
||
if s.lease == 0 { | ||
return true | ||
} | ||
|
||
expire, ok := s.items[schemaVer] | ||
if !ok { | ||
// Can't find schema version means it's already expired. | ||
return false | ||
} | ||
|
||
t := extractPhysicalTime(txnTS) | ||
if t.After(expire) { | ||
return false | ||
} | ||
|
||
return true | ||
} | ||
|
||
// Latest returns the latest schema version it knows. | ||
func (s *schemaValidator) Latest() int64 { | ||
return s.latestSchemaVer | ||
} | ||
|
||
func extractPhysicalTime(ts uint64) time.Time { | ||
t := int64(ts >> 18) // 18 for physicalShiftBits | ||
return time.Unix(t/1e3, (t%1e3)*1e6) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
// Copyright 2016 PingCAP, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package domain | ||
|
||
import ( | ||
"math/rand" | ||
"time" | ||
|
||
. "github.com/pingcap/check" | ||
"github.com/pingcap/tidb/util/testleak" | ||
) | ||
|
||
type leaseGrantItem struct { | ||
leaseGrantTS uint64 | ||
schemaVer int64 | ||
} | ||
|
||
func (*testSuite) TestSchemaValidator(c *C) { | ||
defer testleak.AfterTest(c)() | ||
lease := 2 * time.Millisecond | ||
leaseGrantCh := make(chan leaseGrantItem) | ||
oracleCh := make(chan uint64) | ||
exit := make(chan struct{}) | ||
go serverFunc(lease, leaseGrantCh, oracleCh, exit) | ||
|
||
validator := newSchemaValidator(lease) | ||
|
||
for i := 0; i < 10; i++ { | ||
delay := time.Duration(100+rand.Intn(900)) * time.Microsecond | ||
time.Sleep(delay) | ||
// Reload can run arbitrarily, at any time. | ||
reload(validator, leaseGrantCh) | ||
} | ||
|
||
// Take a lease, check it's valid. | ||
item := <-leaseGrantCh | ||
validator.Update(item.leaseGrantTS, item.schemaVer) | ||
valid := validator.Check(item.leaseGrantTS, item.schemaVer) | ||
c.Assert(valid, IsTrue) | ||
|
||
// Sleep for a long time, check schema is invalid. | ||
time.Sleep(lease) | ||
ts := <-oracleCh | ||
valid = validator.Check(ts, item.schemaVer) | ||
c.Assert(valid, IsFalse) | ||
|
||
reload(validator, leaseGrantCh) | ||
valid = validator.Check(ts, item.schemaVer) | ||
c.Assert(valid, IsFalse) | ||
|
||
// Check the latest schema version must changed. | ||
c.Assert(item.schemaVer, Less, validator.Latest()) | ||
|
||
exit <- struct{}{} | ||
} | ||
|
||
func reload(validator SchemaValidator, leaseGrantCh chan leaseGrantItem) { | ||
item := <-leaseGrantCh | ||
validator.Update(item.leaseGrantTS, item.schemaVer) | ||
} | ||
|
||
// serverFunc plays the role as a remote server, runs in a seperate goroutine. | ||
// It can grant lease and provide timestamp oracle. | ||
// Caller should communicate with it through channel to mock network. | ||
func serverFunc(lease time.Duration, requireLease chan leaseGrantItem, oracleCh chan uint64, exit chan struct{}) { | ||
var version int64 | ||
leaseTS := uint64(time.Now().UnixNano()) | ||
ticker := time.NewTicker(lease) | ||
for { | ||
select { | ||
case <-ticker.C: | ||
version++ | ||
leaseTS = uint64(time.Now().UnixNano()) | ||
case requireLease <- leaseGrantItem{ | ||
leaseGrantTS: leaseTS, | ||
schemaVer: version, | ||
}: | ||
case oracleCh <- uint64(time.Now().UnixNano()): | ||
case <-exit: | ||
return | ||
} | ||
} | ||
} |