Skip to content

Commit

Permalink
planner/core, session, sessionctx/variable: add session variable to c…
Browse files Browse the repository at this point in the history
…ontrol the concurrency of shuffle merge join (pingcap#21332)
  • Loading branch information
huang-b authored Nov 27, 2020
1 parent 55c106a commit cc0e1d4
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 3 deletions.
3 changes: 1 addition & 2 deletions planner/core/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,7 @@ func optimizeByShuffle4StreamAgg(pp *PhysicalStreamAgg, ctx sessionctx.Context)
}

func optimizeByShuffle4MergeJoin(pp *PhysicalMergeJoin, ctx sessionctx.Context) *PhysicalShuffle {
// TODO: should be configured by a session variable
concurrency := 1 // disable by default
concurrency := ctx.GetSessionVars().MergeJoinConcurrency()
if concurrency <= 1 {
return nil
}
Expand Down
1 change: 1 addition & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2222,6 +2222,7 @@ var builtinGlobalVariable = []string{
variable.TiDBHashAggPartialConcurrency,
variable.TiDBHashAggFinalConcurrency,
variable.TiDBWindowConcurrency,
variable.TiDBMergeJoinConcurrency,
variable.TiDBStreamAggConcurrency,
variable.TiDBExecutorConcurrency,
variable.TiDBBackoffLockFast,
Expand Down
19 changes: 19 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,7 @@ func NewSessionVars() *SessionVars {
hashAggPartialConcurrency: DefTiDBHashAggPartialConcurrency,
hashAggFinalConcurrency: DefTiDBHashAggFinalConcurrency,
windowConcurrency: DefTiDBWindowConcurrency,
mergeJoinConcurrency: DefTiDBMergeJoinConcurrency,
streamAggConcurrency: DefTiDBStreamAggConcurrency,
ExecutorConcurrency: DefExecutorConcurrency,
}
Expand Down Expand Up @@ -1362,6 +1363,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.hashAggFinalConcurrency = tidbOptPositiveInt32(val, ConcurrencyUnset)
case TiDBWindowConcurrency:
s.windowConcurrency = tidbOptPositiveInt32(val, ConcurrencyUnset)
case TiDBMergeJoinConcurrency:
s.mergeJoinConcurrency = tidbOptPositiveInt32(val, ConcurrencyUnset)
case TiDBStreamAggConcurrency:
s.streamAggConcurrency = tidbOptPositiveInt32(val, ConcurrencyUnset)
case TiDBDistSQLScanConcurrency:
Expand Down Expand Up @@ -1727,6 +1730,9 @@ type Concurrency struct {
// windowConcurrency is deprecated, use ExecutorConcurrency instead.
windowConcurrency int

// mergeJoinConcurrency is the number of concurrent merge join worker
mergeJoinConcurrency int

// streamAggConcurrency is the number of concurrent stream aggregation worker.
// streamAggConcurrency is deprecated, use ExecutorConcurrency instead.
streamAggConcurrency int
Expand Down Expand Up @@ -1781,6 +1787,11 @@ func (c *Concurrency) SetWindowConcurrency(n int) {
c.windowConcurrency = n
}

// SetMergeJoinConcurrency set the number of concurrent merge join worker.
func (c *Concurrency) SetMergeJoinConcurrency(n int) {
c.mergeJoinConcurrency = n
}

// SetStreamAggConcurrency set the number of concurrent stream aggregation worker.
func (c *Concurrency) SetStreamAggConcurrency(n int) {
c.streamAggConcurrency = n
Expand Down Expand Up @@ -1852,6 +1863,14 @@ func (c *Concurrency) WindowConcurrency() int {
return c.ExecutorConcurrency
}

// MergeJoinConcurrency return the number of concurrent merge join worker.
func (c *Concurrency) MergeJoinConcurrency() int {
if c.mergeJoinConcurrency != ConcurrencyUnset {
return c.mergeJoinConcurrency
}
return c.ExecutorConcurrency
}

// StreamAggConcurrency return the number of concurrent stream aggregation worker.
func (c *Concurrency) StreamAggConcurrency() int {
if c.streamAggConcurrency != ConcurrencyUnset {
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -1017,6 +1017,7 @@ var defaultSysVars = []*SysVar{
{Scope: ScopeGlobal | ScopeSession, Name: TiDBHashAggPartialConcurrency, Value: strconv.Itoa(DefTiDBHashAggPartialConcurrency), Type: TypeInt, MinValue: 1, MaxValue: math.MaxInt64, AllowAutoValue: true},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBHashAggFinalConcurrency, Value: strconv.Itoa(DefTiDBHashAggFinalConcurrency), Type: TypeInt, MinValue: 1, MaxValue: math.MaxInt64, AllowAutoValue: true},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBWindowConcurrency, Value: strconv.Itoa(DefTiDBWindowConcurrency), Type: TypeInt, MinValue: 1, MaxValue: math.MaxInt64, AllowAutoValue: true},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBMergeJoinConcurrency, Value: strconv.Itoa(DefTiDBMergeJoinConcurrency), Type: TypeInt, MinValue: 1, MaxValue: math.MaxInt64, AllowAutoValue: true},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBStreamAggConcurrency, Value: strconv.Itoa(DefTiDBStreamAggConcurrency), Type: TypeInt, MinValue: 1, MaxValue: math.MaxInt64, AllowAutoValue: true},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableParallelApply, Value: BoolToOnOff(DefTiDBEnableParallelApply), Type: TypeBool},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBBackoffLockFast, Value: strconv.Itoa(kv.DefBackoffLockFast), Type: TypeUnsigned, MinValue: 1, MaxValue: math.MaxUint64},
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,9 @@ const (
// tidb_window_concurrency is deprecated, use tidb_executor_concurrency instead.
TiDBWindowConcurrency = "tidb_window_concurrency"

// tidb_merge_join_concurrency is used for merge join parallel executor
TiDBMergeJoinConcurrency = "tidb_merge_join_concurrency"

// tidb_stream_agg_concurrency is used for stream aggregation parallel executor.
// tidb_stream_agg_concurrency is deprecated, use tidb_executor_concurrency instead.
TiDBStreamAggConcurrency = "tidb_streamagg_concurrency"
Expand Down Expand Up @@ -548,6 +551,7 @@ const (
DefTiDBHashAggPartialConcurrency = ConcurrencyUnset
DefTiDBHashAggFinalConcurrency = ConcurrencyUnset
DefTiDBWindowConcurrency = ConcurrencyUnset
DefTiDBMergeJoinConcurrency = 1 // disable optimization by default
DefTiDBStreamAggConcurrency = 1
DefTiDBForcePriority = mysql.NoPriority
DefTiDBUseRadixJoin = false
Expand Down
2 changes: 1 addition & 1 deletion sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func CheckDeprecationSetSystemVar(s *SessionVars, name string) {
switch name {
case TiDBIndexLookupConcurrency, TiDBIndexLookupJoinConcurrency,
TiDBHashJoinConcurrency, TiDBHashAggPartialConcurrency, TiDBHashAggFinalConcurrency,
TiDBProjectionConcurrency, TiDBWindowConcurrency, TiDBStreamAggConcurrency:
TiDBProjectionConcurrency, TiDBWindowConcurrency, TiDBMergeJoinConcurrency, TiDBStreamAggConcurrency:
s.StmtCtx.AppendWarning(errWarnDeprecatedSyntax.FastGenByArgs(name, TiDBExecutorConcurrency))
case TIDBMemQuotaHashJoin, TIDBMemQuotaMergeJoin,
TIDBMemQuotaSort, TIDBMemQuotaTopn,
Expand Down
11 changes: 11 additions & 0 deletions sessionctx/variable/varsutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,14 @@ func (s *testVarsutilSuite) TestNewSessionVars(c *C) {
c.Assert(vars.hashAggPartialConcurrency, Equals, ConcurrencyUnset)
c.Assert(vars.hashAggFinalConcurrency, Equals, ConcurrencyUnset)
c.Assert(vars.windowConcurrency, Equals, ConcurrencyUnset)
c.Assert(vars.mergeJoinConcurrency, Equals, DefTiDBMergeJoinConcurrency)
c.Assert(vars.streamAggConcurrency, Equals, DefTiDBStreamAggConcurrency)
c.Assert(vars.distSQLScanConcurrency, Equals, DefDistSQLScanConcurrency)
c.Assert(vars.ProjectionConcurrency(), Equals, DefExecutorConcurrency)
c.Assert(vars.HashAggPartialConcurrency(), Equals, DefExecutorConcurrency)
c.Assert(vars.HashAggFinalConcurrency(), Equals, DefExecutorConcurrency)
c.Assert(vars.WindowConcurrency(), Equals, DefExecutorConcurrency)
c.Assert(vars.MergeJoinConcurrency(), Equals, DefTiDBMergeJoinConcurrency)
c.Assert(vars.StreamAggConcurrency(), Equals, DefTiDBStreamAggConcurrency)
c.Assert(vars.DistSQLScanConcurrency(), Equals, DefDistSQLScanConcurrency)
c.Assert(vars.ExecutorConcurrency, Equals, DefExecutorConcurrency)
Expand Down Expand Up @@ -667,6 +669,14 @@ func (s *testVarsutilSuite) TestConcurrencyVariables(c *C) {
c.Assert(vars.windowConcurrency, Equals, wdConcurrency)
c.Assert(vars.WindowConcurrency(), Equals, wdConcurrency)

mjConcurrency := 2
c.Assert(vars.mergeJoinConcurrency, Equals, DefTiDBMergeJoinConcurrency)
c.Assert(vars.MergeJoinConcurrency(), Equals, DefTiDBMergeJoinConcurrency)
err = SetSessionSystemVar(vars, TiDBMergeJoinConcurrency, types.NewIntDatum(int64(mjConcurrency)))
c.Assert(err, IsNil)
c.Assert(vars.mergeJoinConcurrency, Equals, mjConcurrency)
c.Assert(vars.MergeJoinConcurrency(), Equals, mjConcurrency)

saConcurrency := 2
c.Assert(vars.streamAggConcurrency, Equals, DefTiDBStreamAggConcurrency)
c.Assert(vars.StreamAggConcurrency(), Equals, DefTiDBStreamAggConcurrency)
Expand All @@ -683,6 +693,7 @@ func (s *testVarsutilSuite) TestConcurrencyVariables(c *C) {
c.Assert(vars.indexLookupConcurrency, Equals, ConcurrencyUnset)
c.Assert(vars.IndexLookupConcurrency(), Equals, exeConcurrency)
c.Assert(vars.WindowConcurrency(), Equals, wdConcurrency)
c.Assert(vars.MergeJoinConcurrency(), Equals, mjConcurrency)
c.Assert(vars.StreamAggConcurrency(), Equals, saConcurrency)

}

0 comments on commit cc0e1d4

Please sign in to comment.