Skip to content

Commit

Permalink
[store/tikv] support batch coprocessor for TiFlash (pingcap#16030)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanfei1991 authored Apr 9, 2020
1 parent 7b0e990 commit af0a49c
Show file tree
Hide file tree
Showing 18 changed files with 598 additions and 7 deletions.
6 changes: 6 additions & 0 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ func (builder *RequestBuilder) SetStoreType(storeType kv.StoreType) *RequestBuil
return builder
}

// SetAllowBatchCop sets `BatchCop` property.
func (builder *RequestBuilder) SetAllowBatchCop(batchCop bool) *RequestBuilder {
builder.Request.BatchCop = batchCop
return builder
}

func (builder *RequestBuilder) getIsolationLevel() kv.IsoLevel {
switch builder.Tp {
case kv.ReqTypeAnalyze:
Expand Down
21 changes: 21 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2207,6 +2207,26 @@ func containsLimit(execs []*tipb.Executor) bool {
return false
}

// When allow batch cop is 1, only agg / topN uses batch cop.
// When allow batch cop is 2, every query uses batch cop.
func (e *TableReaderExecutor) setBatchCop(v *plannercore.PhysicalTableReader) {
if e.storeType != kv.TiFlash || e.keepOrder {
return
}
switch e.ctx.GetSessionVars().AllowBatchCop {
case 1:
for _, p := range v.TablePlans {
switch p.(type) {
case *plannercore.PhysicalHashAgg, *plannercore.PhysicalStreamAgg, *plannercore.PhysicalTopN:
e.batchCop = true
}
}
case 2:
e.batchCop = true
}
return
}

func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableReader) (*TableReaderExecutor, error) {
dagReq, streaming, err := b.constructDAGReq(v.TablePlans)
if err != nil {
Expand Down Expand Up @@ -2237,6 +2257,7 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
plans: v.TablePlans,
storeType: v.StoreType,
}
e.setBatchCop(v)
e.buildVirtualColumnInfo()
if containsLimit(dagReq.Executors) {
e.feedback = statistics.NewQueryFeedback(0, nil, 0, ts.Desc)
Expand Down
3 changes: 3 additions & 0 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ type TableReaderExecutor struct {
virtualColumnIndex []int
// virtualColumnRetFieldTypes records the RetFieldTypes of virtual columns.
virtualColumnRetFieldTypes []*types.FieldType
// batchCop indicates whether use super batch coprocessor request, only works for TiFlash engine.
batchCop bool
}

// Open initialzes necessary variables for using this executor.
Expand Down Expand Up @@ -201,6 +203,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra
SetFromSessionVars(e.ctx.GetSessionVars()).
SetMemTracker(e.memTracker).
SetStoreType(e.storeType).
SetAllowBatchCop(e.batchCop).
Build()
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
github.com/pingcap/failpoint v0.0.0-20200210140405-f8f9fb234798
github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20200330093347-98f910b71904
github.com/pingcap/kvproto v0.0.0-20200409034505-a5af800ca2ef
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd
github.com/pingcap/parser v0.0.0-20200407074807-436f1c8c4cff
github.com/pingcap/pd/v4 v4.0.0-beta.1.0.20200305072537-61d9f9cc35d3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17Xtb
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20200214064158-62d31900d88e/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200221034943-a2aa1d1e20a8/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200330093347-98f910b71904 h1:pMFUXvhJ62hX8m0Q4RsL7L+hSW1mAMG26So5eFMoAtI=
github.com/pingcap/kvproto v0.0.0-20200330093347-98f910b71904/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200409034505-a5af800ca2ef h1:t+bOucRUlIlzW+6S32qG8ufu4iC8F8LEld4Rdhhp1Aw=
github.com/pingcap/kvproto v0.0.0-20200409034505-a5af800ca2ef/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd h1:CV3VsP3Z02MVtdpTMfEgRJ4T9NGgGTxdHpJerent7rM=
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down
2 changes: 2 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,8 @@ type Request struct {
Cacheable bool
// SchemaVer is for any schema-ful storage to validate schema correctness if necessary.
SchemaVar int64
// BatchCop indicates whether send batch coprocessor request to tiflash.
BatchCop bool
}

// ResultSubset represents a result subset from a single storage unit.
Expand Down
1 change: 1 addition & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1947,6 +1947,7 @@ var builtinGlobalVariable = []string{
variable.TiDBEnableNoopFuncs,
variable.TiDBEnableIndexMerge,
variable.TiDBTxnMode,
variable.TiDBAllowBatchCop,
variable.TiDBRowFormatVersion,
variable.TiDBEnableStmtSummary,
variable.TiDBStmtSummaryInternalQuery,
Expand Down
8 changes: 8 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,10 @@ type SessionVars struct {
// This variable is currently not recommended to be turned on.
AllowWriteRowID bool

// AllowBatchCop means if we should send batch coprocessor to TiFlash. Default value is 1, means to use batch cop in case of aggregation and join.
// If value is set to 2 , which means to force to send batch cop for any query. Value is set to 0 means never use batch cop.
AllowBatchCop int

// CorrelationThreshold is the guard to enable row count estimation using column order correlation.
CorrelationThreshold float64

Expand Down Expand Up @@ -717,6 +721,8 @@ func NewSessionVars() *SessionVars {
}
terror.Log(vars.SetSystemVar(TiDBEnableStreaming, enableStreaming))

vars.AllowBatchCop = DefTiDBAllowBatchCop

var enableChunkRPC string
if config.GetGlobalConfig().TiKVClient.EnableChunkRPC {
enableChunkRPC = "1"
Expand Down Expand Up @@ -1076,6 +1082,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.IndexLookupJoinConcurrency = tidbOptPositiveInt32(val, DefIndexLookupJoinConcurrency)
case TiDBIndexJoinBatchSize:
s.IndexJoinBatchSize = tidbOptPositiveInt32(val, DefIndexJoinBatchSize)
case TiDBAllowBatchCop:
s.AllowBatchCop = int(tidbOptInt64(val, DefTiDBAllowBatchCop))
case TiDBIndexLookupSize:
s.IndexLookupSize = tidbOptPositiveInt32(val, DefIndexLookupSize)
case TiDBHashJoinConcurrency:
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,7 @@ var defaultSysVars = []*SysVar{
{ScopeSession, TiDBDMLBatchSize, strconv.Itoa(DefDMLBatchSize)},
{ScopeSession, TiDBCurrentTS, strconv.Itoa(DefCurretTS)},
{ScopeGlobal | ScopeSession, TiDBMaxChunkSize, strconv.Itoa(DefMaxChunkSize)},
{ScopeGlobal | ScopeSession, TiDBAllowBatchCop, strconv.Itoa(DefTiDBAllowBatchCop)},
{ScopeGlobal | ScopeSession, TiDBInitChunkSize, strconv.Itoa(DefInitChunkSize)},
{ScopeGlobal | ScopeSession, TiDBEnableCascadesPlanner, "0"},
{ScopeGlobal | ScopeSession, TiDBEnableIndexMerge, "0"},
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ const (
// TiDBMaxChunkSize is used to control the max chunk size during query execution.
TiDBMaxChunkSize = "tidb_max_chunk_size"

// TiDBAllowBatchCop means if we should send batch coprocessor to TiFlash. Default value is 1, means to use batch cop in case of aggregation and join.
// If value is set to 2 , which means to force to send batch cop for any query. Value is set to 0 means never use batch cop.
TiDBAllowBatchCop = "tidb_allow_batch_cop"

// TiDBInitChunkSize is used to control the init chunk size during query execution.
TiDBInitChunkSize = "tidb_init_chunk_size"

Expand Down Expand Up @@ -442,6 +446,7 @@ const (
DefTiDBHashJoinConcurrency = 5
DefTiDBProjectionConcurrency = 4
DefTiDBOptimizerSelectivityLevel = 0
DefTiDBAllowBatchCop = 1
DefTiDBTxnMode = ""
DefTiDBRowFormatV1 = 1
DefTiDBRowFormatV2 = 2
Expand Down
9 changes: 9 additions & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,15 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
return value, ErrWrongValueForVar.GenWithStackByArgs(name, value)
}
return value, nil
case TiDBAllowBatchCop:
v, err := strconv.ParseInt(value, 10, 64)
if err != nil {
return value, ErrWrongTypeForVar.GenWithStackByArgs(name)
}
if v < 0 || v > 2 {
return value, ErrWrongValueForVar.GenWithStackByArgs(name, value)
}
return value, nil
case TiDBOptCPUFactor,
TiDBOptCopCPUFactor,
TiDBOptNetworkFactor,
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/varsutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (s *testVarsutilSuite) TestNewSessionVars(c *C) {
c.Assert(vars.IndexSerialScanConcurrency, Equals, DefIndexSerialScanConcurrency)
c.Assert(vars.IndexLookupJoinConcurrency, Equals, DefIndexLookupJoinConcurrency)
c.Assert(vars.HashJoinConcurrency, Equals, DefTiDBHashJoinConcurrency)
c.Assert(vars.AllowBatchCop, Equals, DefTiDBAllowBatchCop)
c.Assert(vars.ProjectionConcurrency, Equals, int64(DefTiDBProjectionConcurrency))
c.Assert(vars.HashAggPartialConcurrency, Equals, DefTiDBHashAggPartialConcurrency)
c.Assert(vars.HashAggFinalConcurrency, Equals, DefTiDBHashAggFinalConcurrency)
Expand Down
Loading

0 comments on commit af0a49c

Please sign in to comment.