Skip to content

Commit

Permalink
*: convert lockKeys to key flag (pingcap#18966)
Browse files Browse the repository at this point in the history
Co-authored-by: ti-srebot <[email protected]>
  • Loading branch information
Zejun Li and ti-srebot authored Aug 7, 2020
1 parent ceff1fc commit 2d54976
Show file tree
Hide file tree
Showing 13 changed files with 203 additions and 144 deletions.
2 changes: 0 additions & 2 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,6 @@ func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *kv.LockCtx {
LockKeysDuration: &seVars.StmtCtx.LockKeysDuration,
LockKeysCount: &seVars.StmtCtx.LockKeysCount,
LockExpired: &seVars.TxnCtx.LockExpire,
CheckKeyExists: seVars.StmtCtx.CheckKeyExists,
}
}

Expand Down Expand Up @@ -1653,7 +1652,6 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
}

sc.TblInfo2UnionScan = make(map[*model.TableInfo]bool)
sc.CheckKeyExists = make(map[string]struct{})
errCount, warnCount := vars.StmtCtx.NumErrorWarnings()
vars.SysErrorCount = errCount
vars.SysWarningCount = warnCount
Expand Down
15 changes: 13 additions & 2 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,16 +160,28 @@ type RetrieverMutator interface {
Mutator
}

// MemBufferIterator is an Iterator with KeyFlags related functions.
type MemBufferIterator interface {
Iterator
HasValue() bool
Flags() KeyFlags
}

// MemBuffer is an in-memory kv collection, can be used to buffer write operations.
type MemBuffer interface {
RetrieverMutator

// GetFlags returns the latest flags associated with key.
GetFlags(Key) (KeyFlags, error)
// IterWithFlags returns a MemBufferIterator.
IterWithFlags(k Key, upperBound Key) MemBufferIterator
// IterReverseWithFlags returns a reversed MemBufferIterator.
IterReverseWithFlags(k Key) MemBufferIterator
// SetWithFlags put key-value into the last active staging buffer with the given KeyFlags.
SetWithFlags(Key, []byte, ...FlagsOp) error
// UpdateFlags update the flags associated with key.
UpdateFlags(Key, ...FlagsOp)

// Reset reset the MemBuffer to initial states.
Reset()
// DiscardValues releases the memory used by all values.
Expand All @@ -186,7 +198,7 @@ type MemBuffer interface {
// If the changes are not published by `Release`, they will be discarded.
Cleanup(StagingHandle)
// InspectStage used to inspect the value updates in the given stage.
InspectStage(handle StagingHandle, f func(Key, KeyFlags, []byte))
InspectStage(StagingHandle, func(Key, KeyFlags, []byte))

// Size returns sum of keys and values length.
Size() int
Expand Down Expand Up @@ -256,7 +268,6 @@ type LockCtx struct {
Values map[string]ReturnedValue
ValuesLock sync.Mutex
LockExpired *uint32
CheckKeyExists map[string]struct{}
}

// ReturnedValue pairs the Value and AlreadyLocked flag for PessimisticLock return values result.
Expand Down
99 changes: 66 additions & 33 deletions kv/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ import (

const (
flagPresumeKNE KeyFlags = 1 << iota
flagPessimisticLock
flagKeyLocked
flagKeyLockedValExist
flagNeedCheckExists
flagNoNeedCommit

persistentFlags = flagPessimisticLock
persistentFlags = flagKeyLocked | flagKeyLockedValExist
// bit 1 => red, bit 0 => black
nodeColorBit uint8 = 0x80
nodeFlagsMask = ^nodeColorBit
Expand All @@ -35,14 +37,24 @@ const (
// KeyFlags are metadata associated with key
type KeyFlags uint8

// HasPresumeKeyNotExists retruns whether the associated key use lazy check.
// HasPresumeKeyNotExists returns whether the associated key use lazy check.
func (f KeyFlags) HasPresumeKeyNotExists() bool {
return f&flagPresumeKNE != 0
}

// HasPessimisticLock retruns whether the associated key has acquired pessimistic lock.
func (f KeyFlags) HasPessimisticLock() bool {
return f&flagPessimisticLock != 0
// HasLocked returns whether the associated key has acquired pessimistic lock.
func (f KeyFlags) HasLocked() bool {
return f&flagKeyLocked != 0
}

// HasLockedValueExists returns whether the value exists when key locked.
func (f KeyFlags) HasLockedValueExists() bool {
return f&flagKeyLockedValExist != 0
}

// HasNeedCheckExists returns whether the key need to check existence when it has been locked.
func (f KeyFlags) HasNeedCheckExists() bool {
return f&flagNeedCheckExists != 0
}

// HasNoNeedCommit returns whether the key should be used in 2pc commit phase.
Expand All @@ -51,17 +63,24 @@ func (f KeyFlags) HasNoNeedCommit() bool {
}

// FlagsOp describes KeyFlags modify operation.
type FlagsOp uint8
type FlagsOp uint16

const (
// SetPresumeKeyNotExists marks the existence of the associated key is checked lazily.
// Implies KeyFlags.HasNeedCheckExists() == true.
SetPresumeKeyNotExists FlagsOp = 1 << iota
// DelPresumeKeyNotExists reverts SetPresumeKeyNotExists.
DelPresumeKeyNotExists
// SetPessimisticLock marks the associated key has acquired pessimistic lock.
SetPessimisticLock
// DelPessimisticLock reverts SetPessimisticLock.
DelPessimisticLock
// SetKeyLocked marks the associated key has acquired lock.
SetKeyLocked
// DelKeyLocked reverts SetKeyLocked.
DelKeyLocked
// SetKeyLockedValueExists marks the value exists when key has been locked in Transaction.LockKeys.
SetKeyLockedValueExists
// SetKeyLockedValueNotExists marks the value doesn't exists when key has been locked in Transaction.LockKeys.
SetKeyLockedValueNotExists
// DelNeedCheckExists marks the key no need to be checked in Transaction.LockKeys.
DelNeedCheckExists
// SetNoNeedCommit marks the key shouldn't be used in 2pc commit phase.
SetNoNeedCommit
)
Expand All @@ -70,13 +89,19 @@ func applyFlagsOps(origin KeyFlags, ops ...FlagsOp) KeyFlags {
for _, op := range ops {
switch op {
case SetPresumeKeyNotExists:
origin |= flagPresumeKNE
origin |= flagPresumeKNE | flagNeedCheckExists
case DelPresumeKeyNotExists:
origin &= ^flagPresumeKNE
case SetPessimisticLock:
origin |= flagPessimisticLock
case DelPessimisticLock:
origin &= ^flagPessimisticLock
origin &= ^(flagPresumeKNE | flagNeedCheckExists)
case SetKeyLocked:
origin |= flagKeyLocked
case DelKeyLocked:
origin &= ^flagKeyLocked
case SetKeyLockedValueExists:
origin |= flagKeyLockedValExist
case DelNeedCheckExists:
origin &= ^flagNeedCheckExists
case SetKeyLockedValueNotExists:
origin &= ^flagKeyLockedValExist
case SetNoNeedCommit:
origin |= flagNoNeedCommit
}
Expand All @@ -92,7 +117,7 @@ var tombstone = []byte{}
// The value map is rollbackable, that means you can use the `Staging`, `Release` and `Cleanup` API to safely modify KVs.
//
// The flags map is not rollbackable. There are two types of flag, persistent and non-persistent.
// When discading a newly added KV in `Cleanup`, the non-persistent flags will be cleared.
// When discarding a newly added KV in `Cleanup`, the non-persistent flags will be cleared.
// If there are persistent flags associated with key, we will keep this key in node without value.
type memdb struct {
root memdbArenaAddr
Expand Down Expand Up @@ -126,7 +151,7 @@ func (db *memdb) Staging() StagingHandle {

func (db *memdb) Release(h StagingHandle) {
if int(h) != len(db.stages) {
// This should never happens in production environmen.
// This should never happens in production environment.
// Use panic to make debug easier.
panic("cannot release staging buffer")
}
Expand All @@ -144,7 +169,7 @@ func (db *memdb) Cleanup(h StagingHandle) {
return
}
if int(h) < len(db.stages) {
// This should never happens in production environmen.
// This should never happens in production environment.
// Use panic to make debug easier.
panic("cannot cleanup staging buffer")
}
Expand Down Expand Up @@ -186,7 +211,7 @@ func (db *memdb) Get(_ context.Context, key Key) ([]byte, error) {
panic("vlog is resetted")
}

x := db.tranverse(key, false)
x := db.traverse(key, false)
if x.isNull() {
return nil, ErrNotExist
}
Expand All @@ -198,7 +223,7 @@ func (db *memdb) Get(_ context.Context, key Key) ([]byte, error) {
}

func (db *memdb) GetFlags(key Key) (KeyFlags, error) {
x := db.tranverse(key, false)
x := db.traverse(key, false)
if x.isNull() {
return 0, ErrNotExist
}
Expand Down Expand Up @@ -254,14 +279,13 @@ func (db *memdb) set(key Key, value []byte, ops ...FlagsOp) error {
if len(db.stages) == 0 {
db.dirty = true
}
x := db.tranverse(key, true)
if x.vptr.isNull() && value != nil {
db.size += len(key)
db.count++
}
x := db.traverse(key, true)

if len(ops) != 0 {
flags := applyFlagsOps(x.getKeyFlags(), ops...)
if flags&persistentFlags != 0 {
db.dirty = true
}
x.setKeyFlags(flags)
}

Expand Down Expand Up @@ -299,9 +323,9 @@ func (db *memdb) setValue(x memdbNodeAddr, value []byte) {
db.size = db.size - len(oldVal) + len(value)
}

// tranverse search for and if not found and insert is true, will add a new node in.
// traverse search for and if not found and insert is true, will add a new node in.
// Returns a pointer to the new node, or the node found.
func (db *memdb) tranverse(key Key, insert bool) memdbNodeAddr {
func (db *memdb) traverse(key Key, insert bool) memdbNodeAddr {
x := db.getRoot()
y := memdbNodeAddr{nil, nullAddr}
found := false
Expand Down Expand Up @@ -437,7 +461,8 @@ func (db *memdb) leftRotate(x memdbNodeAddr) {

// If B is not null, set it's parent to be X
if !y.left.isNull() {
y.getLeft(db).up = x.addr
left := y.getLeft(db)
left.up = x.addr
}

// Set Y's parent to be what X's parent was
Expand Down Expand Up @@ -470,7 +495,8 @@ func (db *memdb) rightRotate(y memdbNodeAddr) {

// If B is not null, set it's parent to be Y
if !x.right.isNull() {
x.getRight(db).up = y.addr
right := x.getRight(db)
right.up = y.addr
}

// Set X's parent to be what Y's parent was
Expand Down Expand Up @@ -498,6 +524,9 @@ func (db *memdb) rightRotate(y memdbNodeAddr) {
func (db *memdb) deleteNode(z memdbNodeAddr) {
var x, y memdbNodeAddr

db.count--
db.size -= int(z.klen)

if z.left.isNull() || z.right.isNull() {
y = z
} else {
Expand Down Expand Up @@ -551,10 +580,12 @@ func (db *memdb) replaceNode(old memdbNodeAddr, new memdbNodeAddr) {
}
new.up = old.up

old.getLeft(db).up = new.addr
left := old.getLeft(db)
left.up = new.addr
new.left = old.left

old.getRight(db).up = new.addr
right := old.getRight(db)
right.up = new.addr
new.right = old.right

if old.isBlack() {
Expand Down Expand Up @@ -693,6 +724,8 @@ func (db *memdb) getRoot() memdbNodeAddr {
}

func (db *memdb) allocNode(key Key) memdbNodeAddr {
db.size += len(key)
db.count++
x, xn := db.allocator.allocNode(key)
return memdbNodeAddr{xn, x}
}
Expand Down
2 changes: 0 additions & 2 deletions kv/memdb_arena.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,6 @@ func (l *memdbVlog) revertToCheckpoint(db *memdb, cp *memdbCheckpoint) {
db.size -= int(hdr.valueLen)
// oldValue.isNull() == true means this is a newly added value.
if hdr.oldValue.isNull() {
db.count--
db.size -= int(node.klen)
// If there are no flags associated with this key, we need to delete this node.
keptFlags := node.getKeyFlags() & persistentFlags
if keptFlags == 0 {
Expand Down
30 changes: 30 additions & 0 deletions kv/memdb_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,28 @@ func (db *memdb) IterReverse(k Key) (Iterator, error) {
return i, nil
}

func (db *memdb) IterWithFlags(k Key, upperBound Key) MemBufferIterator {
i := &memdbIterator{
db: db,
start: k,
end: upperBound,
includeFlags: true,
}
i.init()
return i
}

func (db *memdb) IterReverseWithFlags(k Key) MemBufferIterator {
i := &memdbIterator{
db: db,
end: k,
reverse: true,
includeFlags: true,
}
i.init()
return i
}

func (i *memdbIterator) init() {
if i.reverse {
if len(i.end) == 0 {
Expand Down Expand Up @@ -72,6 +94,14 @@ func (i *memdbIterator) Valid() bool {
return !i.curr.isNull()
}

func (i *memdbIterator) Flags() KeyFlags {
return i.curr.getKeyFlags()
}

func (i *memdbIterator) HasValue() bool {
return !i.isFlagsOnly()
}

func (i *memdbIterator) Key() Key {
return i.curr.getKey()
}
Expand Down
Loading

0 comments on commit 2d54976

Please sign in to comment.