Skip to content

Commit

Permalink
executor: track the memory usage of IndexLookUpExecutor (pingcap#6009)
Browse files Browse the repository at this point in the history
  • Loading branch information
zz-jason authored Mar 20, 2018
1 parent 67c948b commit 4886dcb
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 46 deletions.
84 changes: 65 additions & 19 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/juju/errors"
"github.com/opentracing/opentracing-go"
Expand All @@ -36,6 +37,7 @@ import (
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/ranger"
tipb "github.com/pingcap/tipb/go-tipb"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -66,6 +68,20 @@ type lookupTableTask struct {
// The handles fetched from index is originally ordered by index, but we need handles to be ordered by itself
// to do table request.
indexOrder map[int64]int

// memUsage records the memory usage of this task calculated by table worker.
// memTracker is used to release memUsage after task is done and unused.
//
// The sequence of function calls are:
// 1. calculate task.memUsage.
// 2. task.memTracker = tableWorker.memTracker
// 3. task.memTracker.Consume(task.memUsage)
// 4. task.memTracker.Consume(-task.memUsage)
//
// Step 1~3 are completed in "tableWorker.executeTask".
// Step 4 is completed in "IndexLookUpExecutor.NextChunk".
memUsage int64
memTracker *memory.Tracker
}

func (task *lookupTableTask) Len() int {
Expand Down Expand Up @@ -502,6 +518,9 @@ type IndexLookUpExecutor struct {
resultCurr *lookupTableTask
feedback *statistics.QueryFeedback

// memTracker is used to track the memory usage of this executor.
memTracker *memory.Tracker

// isCheckOp is used to determine whether we need to check the consistency of the index data.
isCheckOp bool
}
Expand All @@ -521,6 +540,13 @@ func (e *IndexLookUpExecutor) Open(ctx context.Context) error {
}

func (e *IndexLookUpExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) error {
// We have to initialize "memTracker" and other execution resources in here
// instead of in function "Open", because this "IndexLookUpExecutor" may be
// constructed by a "IndexLookUpJoin" and "Open" will not be called in that
// situation.
e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaIndexLookupReader)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)

span, ctx := startSpanFollowsContext(ctx, "executor.IndexLookUp.Open")
defer span.Finish()

Expand Down Expand Up @@ -600,7 +626,9 @@ func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-cha
keepOrder: e.keepOrder,
handleIdx: e.handleIdx,
isCheckOp: e.isCheckOp,
memTracker: memory.NewTracker("tableWorker", -1),
}
worker.memTracker.AttachTo(e.memTracker)
ctx1, cancel := context.WithCancel(ctx)
go func() {
worker.pickAndExecTask(ctx1)
Expand Down Expand Up @@ -691,10 +719,14 @@ func (e *IndexLookUpExecutor) getResultTask() (*lookupTableTask, error) {
if !ok {
return nil, nil
}
err := <-task.doneCh
if err != nil {
if err := <-task.doneCh; err != nil {
return nil, errors.Trace(err)
}

// Release the memory usage of last task before we handle a new task.
if e.resultCurr != nil {
e.resultCurr.memTracker.Consume(-e.resultCurr.memUsage)
}
e.resultCurr = task
return e.resultCurr, nil
}
Expand Down Expand Up @@ -761,13 +793,12 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes
func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult) (handles []int64, err error) {
handles = make([]int64, 0, w.batchSize)
for len(handles) < w.batchSize {
e0 := idxResult.NextChunk(ctx, chk)
if e0 != nil {
err = errors.Trace(e0)
return
err = errors.Trace(idxResult.NextChunk(ctx, chk))
if err != nil {
return handles, err
}
if chk.NumRows() == 0 {
return
return handles, nil
}
for i := 0; i < chk.NumRows(); i++ {
handles = append(handles, chk.GetRow(i).GetInt64(0))
Expand All @@ -777,7 +808,7 @@ func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk,
if w.batchSize > w.maxBatchSize {
w.batchSize = w.maxBatchSize
}
return
return handles, nil
}

func (w *indexWorker) buildTableTask(handles []int64) *lookupTableTask {
Expand Down Expand Up @@ -805,6 +836,9 @@ type tableWorker struct {
keepOrder bool
handleIdx int

// memTracker is used to track the memory usage of this executor.
memTracker *memory.Tracker

// isCheckOp is used to determine whether we need to check the consistency of the index data.
isCheckOp bool
}
Expand All @@ -831,50 +865,60 @@ func (w *tableWorker) pickAndExecTask(ctx context.Context) {
if !ok {
return
}
w.executeTask(ctx, task)
case <-w.finished:
return
}
err := w.executeTask(ctx, task)
task.doneCh <- errors.Trace(err)
}
}

// executeTask executes the table look up tasks. We will construct a table reader and send request by handles.
// Then we hold the returning rows and finish this task.
func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) {
var err error
defer func() {
task.doneCh <- errors.Trace(err)
}()
var tableReader Executor
tableReader, err = w.buildTblReader(ctx, task.handles)
func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) error {
tableReader, err := w.buildTblReader(ctx, task.handles)
if err != nil {
log.Error(err)
return
return errors.Trace(err)
}
defer terror.Call(tableReader.Close)

task.memTracker = w.memTracker
memUsage := int64(cap(task.handles) * 8)
task.memUsage = memUsage
task.memTracker.Consume(memUsage)
handleCnt := len(task.handles)
task.rows = make([]chunk.Row, 0, handleCnt)
for {
chk := tableReader.newChunk()
err = tableReader.NextChunk(ctx, chk)
if err != nil {
log.Error(err)
return
return errors.Trace(err)
}
if chk.NumRows() == 0 {
break
}
memUsage = chk.MemoryUsage()
task.memUsage += memUsage
task.memTracker.Consume(memUsage)
iter := chunk.NewIterator4Chunk(chk)
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
task.rows = append(task.rows, row)
}
}
memUsage = int64(cap(task.rows)) * int64(unsafe.Sizeof(chunk.Row{}))
task.memUsage += memUsage
task.memTracker.Consume(memUsage)
if w.keepOrder {
task.rowIdx = make([]int, 0, len(task.rows))
for i := range task.rows {
handle := task.rows[i].GetInt64(w.handleIdx)
task.rowIdx = append(task.rowIdx, task.indexOrder[handle])
}
memUsage = int64(cap(task.rowIdx) * 4)
task.memUsage += memUsage
task.memTracker.Consume(memUsage)
sort.Sort(task)
}

Expand All @@ -884,9 +928,11 @@ func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) {
handle := row.GetInt64(w.handleIdx)
obtainedHandlesMap[handle] = struct{}{}
}
err = errors.Errorf("handle count %d isn't equal to value count %d, missing handles %v in a batch",
return errors.Errorf("handle count %d isn't equal to value count %d, missing handles %v in a batch",
handleCnt, len(task.rows), GetLackHandles(task.handles, obtainedHandlesMap))
}

return nil
}

// GetLackHandles gets the handles in expectedHandles but not in obtainedHandlesMap.
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ type SessionVars struct {
MemQuotaSort int64
// MemQuotaTopn defines the memory quota for a top n executor.
MemQuotaTopn int64
// MemQuotaIndexLookupReader defines the memory quota for a index lookup reader executor.
MemQuotaIndexLookupReader int64

// EnableStreaming indicates whether the coprocessor request can use streaming API.
// TODO: remove this after tidb-server configuration "enable-streaming' removed.
Expand Down Expand Up @@ -321,6 +323,7 @@ func NewSessionVars() *SessionVars {
MemQuotaHashJoin: DefTiDBMemQuotaHashJoin,
MemQuotaSort: DefTiDBMemQuotaSort,
MemQuotaTopn: DefTiDBMemQuotaTopn,
MemQuotaIndexLookupReader: DefTiDBMemQuotaIndexLookupReader,
}
var enableStreaming string
if config.GetGlobalConfig().EnableStreaming {
Expand Down Expand Up @@ -499,6 +502,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.MemQuotaSort = tidbOptInt64(val, DefTiDBMemQuotaSort)
case TIDBMemQuotaTopn:
s.MemQuotaTopn = tidbOptInt64(val, DefTiDBMemQuotaTopn)
case TIDBMemQuotaIndexLookupReader:
s.MemQuotaIndexLookupReader = tidbOptInt64(val, DefTiDBMemQuotaIndexLookupReader)
case TiDBGeneralLog:
atomic.StoreUint32(&ProcessGeneralLog, uint32(tidbOptPositiveInt(val, DefTiDBGeneralLog)))
case TiDBEnableStreaming:
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,7 @@ var defaultSysVars = []*SysVar{
{ScopeSession, TIDBMemQuotaHashJoin, strconv.FormatInt(DefTiDBMemQuotaHashJoin, 10)},
{ScopeSession, TIDBMemQuotaSort, strconv.FormatInt(DefTiDBMemQuotaSort, 10)},
{ScopeSession, TIDBMemQuotaTopn, strconv.FormatInt(DefTiDBMemQuotaTopn, 10)},
{ScopeSession, TIDBMemQuotaIndexLookupReader, strconv.FormatInt(DefTiDBMemQuotaIndexLookupReader, 10)},
{ScopeSession, TiDBEnableStreaming, "0"},
/* The following variable is defined as session scope but is actually server scope. */
{ScopeSession, TiDBGeneralLog, strconv.Itoa(DefTiDBGeneralLog)},
Expand Down
57 changes: 30 additions & 27 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,16 @@ const (
TiDBMaxChunkSize = "tidb_max_chunk_size"

// The following session variables controls the memory quota during query execution.
// "tidb_mem_quota_query": control the memory quota of a query.
// "tidb_mem_quota_hashjoin": control the memory quota of "HashJoinExec".
// "tidb_mem_quota_sort": control the memory quota of "SortExec".
// "tidb_mem_quota_topn": control the memory quota of "TopNExec".
TIDBMemQuotaQuery = "tidb_mem_quota_query" // Bytes.
TIDBMemQuotaHashJoin = "tidb_mem_quota_hashjoin" // Bytes.
TIDBMemQuotaSort = "tidb_mem_quota_sort" // Bytes.
TIDBMemQuotaTopn = "tidb_mem_quota_topn" // Bytes.
// "tidb_mem_quota_query": control the memory quota of a query.
// "tidb_mem_quota_hashjoin": control the memory quota of "HashJoinExec".
// "tidb_mem_quota_sort": control the memory quota of "SortExec".
// "tidb_mem_quota_topn": control the memory quota of "TopNExec".
// "tidb_mem_quota_indexlookupreader": control the memory quota of "IndexLookUpExecutor".
TIDBMemQuotaQuery = "tidb_mem_quota_query" // Bytes.
TIDBMemQuotaHashJoin = "tidb_mem_quota_hashjoin" // Bytes.
TIDBMemQuotaSort = "tidb_mem_quota_sort" // Bytes.
TIDBMemQuotaTopn = "tidb_mem_quota_topn" // Bytes.
TIDBMemQuotaIndexLookupReader = "tidb_mem_quota_indexlookupreader" // Bytes.

// tidb_general_log is used to log every query in the server in info level.
TiDBGeneralLog = "tidb_general_log"
Expand All @@ -127,25 +129,26 @@ const (

// Default TiDB system variable values.
const (
DefIndexLookupConcurrency = 4
DefIndexSerialScanConcurrency = 1
DefIndexJoinBatchSize = 25000
DefIndexLookupSize = 20000
DefDistSQLScanConcurrency = 15
DefBuildStatsConcurrency = 4
DefSkipUTF8Check = false
DefOptAggPushDown = false
DefOptInSubqUnfolding = false
DefBatchInsert = false
DefBatchDelete = false
DefCurretTS = 0
DefMaxChunkSize = 1024
DefDMLBatchSize = 20000
DefTiDBMemQuotaQuery = 32 * 1024 * 1024 * 1024 // 32GB.
DefTiDBMemQuotaHashJoin = 32 * 1024 * 1024 * 1024 // 32GB.
DefTiDBMemQuotaSort = 32 * 1024 * 1024 * 1024 // 32GB.
DefTiDBMemQuotaTopn = 32 * 1024 * 1024 * 1024 // 32GB.
DefTiDBGeneralLog = 0
DefIndexLookupConcurrency = 4
DefIndexSerialScanConcurrency = 1
DefIndexJoinBatchSize = 25000
DefIndexLookupSize = 20000
DefDistSQLScanConcurrency = 15
DefBuildStatsConcurrency = 4
DefSkipUTF8Check = false
DefOptAggPushDown = false
DefOptInSubqUnfolding = false
DefBatchInsert = false
DefBatchDelete = false
DefCurretTS = 0
DefMaxChunkSize = 1024
DefDMLBatchSize = 20000
DefTiDBMemQuotaQuery = 32 << 30 // 32GB.
DefTiDBMemQuotaHashJoin = 32 << 30 // 32GB.
DefTiDBMemQuotaSort = 32 << 30 // 32GB.
DefTiDBMemQuotaTopn = 32 << 30 // 32GB.
DefTiDBMemQuotaIndexLookupReader = 32 << 30 // 32GB.
DefTiDBGeneralLog = 0
)

// Process global variables.
Expand Down

0 comments on commit 4886dcb

Please sign in to comment.