Skip to content

Commit

Permalink
exeutor: add a switch for memory tracker in aggregate (pingcap#21597)
Browse files Browse the repository at this point in the history
  • Loading branch information
wshwsh12 authored Dec 16, 2020
1 parent 291d0b8 commit 9f09524
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 2 deletions.
8 changes: 6 additions & 2 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,9 @@ func (e *HashAggExec) Open(ctx context.Context) error {
e.prepared = false

e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
if e.ctx.GetSessionVars().TrackAggregateMemoryUsage {
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
}

if e.isUnparallelExec {
e.initForUnparallelExec()
Expand Down Expand Up @@ -1015,7 +1017,9 @@ func (e *StreamAggExec) Open(ctx context.Context) error {

// bytesLimit <= 0 means no limit, for now we just track the memory footprint
e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
if e.ctx.GetSessionVars().TrackAggregateMemoryUsage {
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
}
e.memTracker.Consume(e.childResult.MemoryUsage())
return nil
}
Expand Down
19 changes: 19 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7152,6 +7152,25 @@ func (s *testSuite) Test15492(c *C) {
tk.MustQuery("select a + 1 as field1, a as field2 from t order by field1, field2 limit 2").Check(testkit.Rows("2 1", "3 2"))
}

func (s testSuite) TestTrackAggMemoryUsage(c *C) {
tk := testkit.NewTestKit(c, s.store)

tk.MustExec("use test")
tk.MustExec("create table t(a int)")
tk.MustExec("insert into t values(1)")

tk.MustExec("set tidb_track_aggregate_memory_usage = off;")
rows := tk.MustQuery("explain analyze select /*+ HASH_AGG() */ sum(a) from t").Rows()
c.Assert(rows[0][7], Equals, "N/A")
rows = tk.MustQuery("explain analyze select /*+ STREAM_AGG() */ sum(a) from t").Rows()
c.Assert(rows[0][7], Equals, "N/A")
tk.MustExec("set tidb_track_aggregate_memory_usage = on;")
rows = tk.MustQuery("explain analyze select /*+ HASH_AGG() */ sum(a) from t").Rows()
c.Assert(rows[0][7], Not(Equals), "N/A")
rows = tk.MustQuery("explain analyze select /*+ STREAM_AGG() */ sum(a) from t").Rows()
c.Assert(rows[0][7], Not(Equals), "N/A")
}

func (s *testSuite) Test12201(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
1 change: 1 addition & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2295,6 +2295,7 @@ var builtinGlobalVariable = []string{
variable.TiDBEnable1PC,
variable.TiDBGuaranteeExternalConsistency,
variable.TiDBAnalyzeVersion,
variable.TiDBTrackAggregateMemoryUsage,
}

var (
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,9 @@ type SessionVars struct {

// AnalyzeVersion indicates how TiDB collect and use analyzed statistics.
AnalyzeVersion int

// TrackAggregateMemoryUsage indicates whether to track the memory usage of aggregate function.
TrackAggregateMemoryUsage bool
}

// CheckAndGetTxnScope will return the transaction scope we should use in the current session.
Expand Down Expand Up @@ -1645,6 +1648,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.GuaranteeExternalConsistency = TiDBOptOn(val)
case TiDBAnalyzeVersion:
s.AnalyzeVersion = tidbOptPositiveInt32(val, DefTiDBAnalyzeVersion)
case TiDBTrackAggregateMemoryUsage:
s.TrackAggregateMemoryUsage = TiDBOptOn(val)
}
s.systems[name] = val
return nil
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -1159,6 +1159,7 @@ var defaultSysVars = []*SysVar{
{Scope: ScopeGlobal | ScopeSession, Name: TiDBEnable1PC, Value: BoolToOnOff(DefTiDBEnable1PC), Type: TypeBool},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBGuaranteeExternalConsistency, Value: BoolToOnOff(DefTiDBGuaranteeExternalConsistency), Type: TypeBool},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBAnalyzeVersion, Value: strconv.Itoa(DefTiDBAnalyzeVersion), Type: TypeInt, MinValue: 1, MaxValue: 2},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBTrackAggregateMemoryUsage, Value: BoolToOnOff(DefTiDBTrackAggregateMemoryUsage), Type: TypeBool},
}

// SynonymsSysVariables is synonyms of system variables.
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,9 @@ const (

// TiDBAnalyzeVersion indicates the how tidb collects the analyzed statistics and how use to it.
TiDBAnalyzeVersion = "tidb_analyze_version"

// TiDBTrackAggregateMemoryUsage indicates whether track the memory usage of aggregate function.
TiDBTrackAggregateMemoryUsage = "tidb_track_aggregate_memory_usage"
)

// Default TiDB system variable values.
Expand Down Expand Up @@ -614,6 +617,7 @@ const (
DefTiDBEnable1PC = false
DefTiDBGuaranteeExternalConsistency = false
DefTiDBAnalyzeVersion = 1
DefTiDBTrackAggregateMemoryUsage = false
)

// Process global variables.
Expand Down

0 comments on commit 9f09524

Please sign in to comment.