Skip to content

Commit

Permalink
*: implement priority control for OOM Action (pingcap#21170)
Browse files Browse the repository at this point in the history
  • Loading branch information
wshwsh12 authored Nov 25, 2020
1 parent 205f77b commit af58658
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 43 deletions.
7 changes: 5 additions & 2 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ const (

// globalPanicOnExceed panics when GlobalDisTracker storage usage exceeds storage quota.
type globalPanicOnExceed struct {
memory.BaseOOMAction
mutex sync.Mutex // For synchronization.
}

Expand Down Expand Up @@ -142,8 +143,10 @@ func (a *globalPanicOnExceed) Action(t *memory.Tracker) {
panic(msg)
}

// SetFallback sets a fallback action.
func (a *globalPanicOnExceed) SetFallback(memory.ActionOnExceed) {}
// GetPriority get the priority of the Action
func (a *globalPanicOnExceed) GetPriority() int64 {
return memory.DefPanicPriority
}

// base returns the baseExecutor of an executor, don't override this method!
func (e *baseExecutor) base() *baseExecutor {
Expand Down
33 changes: 33 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ import (
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/gcutil"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tidb/util/testkit"
Expand Down Expand Up @@ -7004,3 +7005,35 @@ func (s *testSuite) TestIssue20305(c *C) {
tk.MustExec("INSERT INTO `t3` VALUES (2069, 70), (2010, 11), (2155, 2156), (2069, 69)")
tk.MustQuery("SELECT * FROM `t3` where y <= a").Check(testkit.Rows("2155 2156"))
}

func (s *testSuite) TestOOMActionPriority(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t0")
tk.MustExec("drop table if exists t1")
tk.MustExec("drop table if exists t2")
tk.MustExec("drop table if exists t3")
tk.MustExec("drop table if exists t4")
tk.MustExec("create table t0(a int)")
tk.MustExec("insert into t0 values(1)")
tk.MustExec("create table t1(a int)")
tk.MustExec("insert into t1 values(1)")
tk.MustExec("create table t2(a int)")
tk.MustExec("insert into t2 values(1)")
tk.MustExec("create table t3(a int)")
tk.MustExec("insert into t3 values(1)")
tk.MustExec("create table t4(a int)")
tk.MustExec("insert into t4 values(1)")
tk.MustQuery("select * from t0 join t1 join t2 join t3 join t4 order by t0.a").Check(testkit.Rows("1 1 1 1 1"))
action := tk.Se.GetSessionVars().StmtCtx.MemTracker.GetFallbackForTest()
// check the first 5 actions is rate limit.
for i := 0; i < 5; i++ {
c.Assert(action.GetPriority(), Equals, int64(memory.DefRateLimitPriority))
action = action.GetFallback()
}
for action.GetFallback() != nil {
c.Assert(action.GetPriority(), Equals, int64(memory.DefSpillPriority))
action = action.GetFallback()
}
c.Assert(action.GetPriority(), Equals, int64(memory.DefLogPriority))
}
18 changes: 9 additions & 9 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1308,9 +1308,9 @@ func (it copErrorResponse) Close() error {
// set on initial. Each time the Action is triggered, one token would be destroyed. If the count of the token is less
// than 2, the action would be delegated to the fallback action.
type rateLimitAction struct {
memory.BaseOOMAction
// enabled indicates whether the rateLimitAction is permitted to Action. 1 means permitted, 0 denied.
enabled uint32
fallbackAction memory.ActionOnExceed
enabled uint32
// totalTokenNum indicates the total token at initial
totalTokenNum uint
cond struct {
Expand Down Expand Up @@ -1352,8 +1352,8 @@ func (e *rateLimitAction) Action(t *memory.Tracker) {
})

if !e.isEnabled() {
if e.fallbackAction != nil {
e.fallbackAction.Action(t)
if fallback := e.GetFallback(); fallback != nil {
fallback.Action(t)
}
return
}
Expand All @@ -1364,8 +1364,8 @@ func (e *rateLimitAction) Action(t *memory.Tracker) {
e.setEnabled(false)
logutil.BgLogger().Info("memory exceed quota, rateLimitAction delegate to fallback action",
zap.Uint("total token count", e.totalTokenNum))
if e.fallbackAction != nil {
e.fallbackAction.Action(t)
if fallback := e.GetFallback(); fallback != nil {
fallback.Action(t)
}
return
}
Expand All @@ -1391,9 +1391,9 @@ func (e *rateLimitAction) SetLogHook(hook func(uint64)) {

}

// SetFallback implements ActionOnExceed.SetFallback
func (e *rateLimitAction) SetFallback(a memory.ActionOnExceed) {
e.fallbackAction = a
// GetPriority get the priority of the Action.
func (e *rateLimitAction) GetPriority() int64 {
return memory.DefRateLimitPriority
}

// destroyTokenIfNeeded will check the `exceed` flag after copWorker finished one task.
Expand Down
33 changes: 14 additions & 19 deletions util/chunk/row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,11 @@ func (c *RowContainer) ActionSpillForTest() *SpillDiskAction {
// the memory quota of a query is exceeded, SpillDiskAction.Action is
// triggered.
type SpillDiskAction struct {
c *RowContainer
fallbackAction memory.ActionOnExceed
m sync.Mutex
once sync.Once
cond spillStatusCond
memory.BaseOOMAction
c *RowContainer
m sync.Mutex
once sync.Once
cond spillStatusCond

// test function only used for test sync.
testSyncInputFunc func()
Expand Down Expand Up @@ -333,8 +333,8 @@ func (a *SpillDiskAction) Action(t *memory.Tracker) {
if !t.CheckExceed() {
return
}
if a.fallbackAction != nil {
a.fallbackAction.Action(t)
if fallback := a.GetFallback(); fallback != nil {
fallback.Action(t)
}
}

Expand All @@ -346,14 +346,14 @@ func (a *SpillDiskAction) Reset() {
a.once = sync.Once{}
}

// SetFallback sets the fallback action.
func (a *SpillDiskAction) SetFallback(fallback memory.ActionOnExceed) {
a.fallbackAction = fallback
}

// SetLogHook sets the hook, it does nothing just to form the memory.ActionOnExceed interface.
func (a *SpillDiskAction) SetLogHook(hook func(uint64)) {}

// GetPriority get the priority of the Action.
func (a *SpillDiskAction) GetPriority() int64 {
return memory.DefSpillPriority
}

// WaitForTest waits all goroutine have gone.
func (a *SpillDiskAction) WaitForTest() {
a.testWg.Wait()
Expand Down Expand Up @@ -528,16 +528,11 @@ func (a *SortAndSpillDiskAction) Action(t *memory.Tracker) {
if !t.CheckExceed() {
return
}
if a.fallbackAction != nil {
a.fallbackAction.Action(t)
if fallback := a.GetFallback(); fallback != nil {
fallback.Action(t)
}
}

// SetFallback sets the fallback action.
func (a *SortAndSpillDiskAction) SetFallback(fallback memory.ActionOnExceed) {
a.fallbackAction = fallback
}

// SetLogHook sets the hook, it does nothing just to form the memory.ActionOnExceed interface.
func (a *SortAndSpillDiskAction) SetLogHook(hook func(uint64)) {}

Expand Down
42 changes: 38 additions & 4 deletions util/memory/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,39 @@ type ActionOnExceed interface {
// SetFallback sets a fallback action which will be triggered if itself has
// already been triggered.
SetFallback(a ActionOnExceed)
// GetFallback get the fallback action of the Action.
GetFallback() ActionOnExceed
// GetPriority get the priority of the Action.
GetPriority() int64
}

// BaseOOMAction manages the fallback action for all Action.
type BaseOOMAction struct {
fallbackAction ActionOnExceed
}

// SetFallback sets a fallback action which will be triggered if itself has
// already been triggered.
func (b *BaseOOMAction) SetFallback(a ActionOnExceed) {
b.fallbackAction = a
}

// GetFallback get the fallback action of the Action.
func (b *BaseOOMAction) GetFallback() ActionOnExceed {
return b.fallbackAction
}

// Default OOM Action priority.
const (
DefPanicPriority = iota
DefLogPriority
DefSpillPriority
DefRateLimitPriority
)

// LogOnExceed logs a warning only once when memory usage exceeds memory quota.
type LogOnExceed struct {
BaseOOMAction
mutex sync.Mutex // For synchronization.
acted bool
ConnID uint64
Expand All @@ -65,11 +94,14 @@ func (a *LogOnExceed) Action(t *Tracker) {
}
}

// SetFallback sets a fallback action.
func (a *LogOnExceed) SetFallback(ActionOnExceed) {}
// GetPriority get the priority of the Action
func (a *LogOnExceed) GetPriority() int64 {
return DefLogPriority
}

// PanicOnExceed panics when memory usage exceeds memory quota.
type PanicOnExceed struct {
BaseOOMAction
mutex sync.Mutex // For synchronization.
acted bool
ConnID uint64
Expand All @@ -96,8 +128,10 @@ func (a *PanicOnExceed) Action(t *Tracker) {
panic(PanicMemoryExceed + fmt.Sprintf("[conn_id=%d]", a.ConnID))
}

// SetFallback sets a fallback action.
func (a *PanicOnExceed) SetFallback(ActionOnExceed) {}
// GetPriority get the priority of the Action
func (a *PanicOnExceed) GetPriority() int64 {
return DefPanicPriority
}

var (
errMemExceedThreshold = dbterror.ClassUtil.NewStd(errno.ErrMemExceedThreshold)
Expand Down
27 changes: 25 additions & 2 deletions util/memory/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,31 @@ func (t *Tracker) SetActionOnExceed(a ActionOnExceed) {
func (t *Tracker) FallbackOldAndSetNewAction(a ActionOnExceed) {
t.actionMu.Lock()
defer t.actionMu.Unlock()
a.SetFallback(t.actionMu.actionOnExceed)
t.actionMu.actionOnExceed = a
t.actionMu.actionOnExceed = reArrangeFallback(t.actionMu.actionOnExceed, a)
}

// GetFallbackForTest get the oom action used by test.
func (t *Tracker) GetFallbackForTest() ActionOnExceed {
t.actionMu.Lock()
defer t.actionMu.Unlock()
return t.actionMu.actionOnExceed
}

// reArrangeFallback merge two action chains and rearrange them by priority in descending order.
func reArrangeFallback(a ActionOnExceed, b ActionOnExceed) ActionOnExceed {
if a == nil {
return b
}
if b == nil {
return a
}
if a.GetPriority() < b.GetPriority() {
a, b = b, a
a.SetFallback(b)
} else {
a.SetFallback(reArrangeFallback(a.GetFallback(), b))
}
return a
}

// SetLabel sets the label of a Tracker.
Expand Down
50 changes: 43 additions & 7 deletions util/memory/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,31 +106,32 @@ func (s *testSuite) TestOOMAction(c *C) {
c.Assert(action1.called, IsFalse)
c.Assert(action2.called, IsFalse)
tracker.Consume(10000)
c.Assert(action1.called, IsFalse)
c.Assert(action2.called, IsTrue)
c.Assert(action1.called, IsTrue)
c.Assert(action2.called, IsFalse)
tracker.Consume(10000)
c.Assert(action1.called, IsTrue)
c.Assert(action2.called, IsTrue)
}

type mockAction struct {
BaseOOMAction
called bool
fallback ActionOnExceed
priority int64
}

func (a *mockAction) SetLogHook(hook func(uint64)) {
}

func (a *mockAction) Action(t *Tracker) {
if a.called && a.fallback != nil {
a.fallback.Action(t)
if a.called && a.fallbackAction != nil {
a.fallbackAction.Action(t)
return
}
a.called = true
}

func (a *mockAction) SetFallback(fallback ActionOnExceed) {
a.fallback = fallback
func (a *mockAction) GetPriority() int64 {
return a.priority
}

func (s *testSuite) TestAttachTo(c *C) {
Expand Down Expand Up @@ -341,3 +342,38 @@ func BenchmarkConsume(b *testing.B) {
func (s *testSuite) TestErrorCode(c *C) {
c.Assert(int(terror.ToSQLError(errMemExceedThreshold).Code), Equals, errno.ErrMemExceedThreshold)
}

func (s *testSuite) TestOOMActionPriority(c *C) {
tracker := NewTracker(1, 100)
// make sure no panic here.
tracker.Consume(10000)

tracker = NewTracker(1, 1)
tracker.actionMu.actionOnExceed = nil
n := 100
actions := make([]*mockAction, n)
for i := 0; i < n; i++ {
actions[i] = &mockAction{priority: int64(i)}
}

randomSuffle := make([]int, n)
for i := 0; i < n; i++ {
randomSuffle[i] = i
pos := rand.Int() % (i + 1)
randomSuffle[i], randomSuffle[pos] = randomSuffle[pos], randomSuffle[i]
}

for i := 0; i < n; i++ {
tracker.FallbackOldAndSetNewAction(actions[randomSuffle[i]])
}
for i := n - 1; i >= 0; i-- {
tracker.Consume(100)
for j := n - 1; j >= 0; j-- {
if j >= i {
c.Assert(actions[j].called, IsTrue)
} else {
c.Assert(actions[j].called, IsFalse)
}
}
}
}

0 comments on commit af58658

Please sign in to comment.